From 7b5da7ab27aab6753708c71395764afe7e9e6e26 Mon Sep 17 00:00:00 2001 From: Oren Date: Mon, 25 Nov 2024 16:52:53 +0200 Subject: [PATCH 1/8] define new optimizer metrics --- protocol/metrics/consumer_metrics_manager.go | 162 +++++++++++++++++++ utils/score/score_store.go | 1 + 2 files changed, 163 insertions(+) diff --git a/protocol/metrics/consumer_metrics_manager.go b/protocol/metrics/consumer_metrics_manager.go index b3ac3e910e..cd4c3e1213 100644 --- a/protocol/metrics/consumer_metrics_manager.go +++ b/protocol/metrics/consumer_metrics_manager.go @@ -8,6 +8,7 @@ import ( "time" "github.com/lavanet/lava/v4/utils" + scoreutils "github.com/lavanet/lava/v4/utils/score" pairingtypes "github.com/lavanet/lava/v4/x/pairing/types" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promhttp" @@ -64,6 +65,22 @@ type ConsumerMetricsManager struct { relayProcessingLatencyBeforeProvider *prometheus.GaugeVec relayProcessingLatencyAfterProvider *prometheus.GaugeVec averageProcessingLatency map[string]*LatencyTracker + + // optimizer metrics + optimizerProviderScore *prometheus.GaugeVec + optimizerProviderLatency *prometheus.GaugeVec + optimizerProviderSync *prometheus.GaugeVec + optimizerProviderAvailability *prometheus.GaugeVec + optimizerProviderTier *prometheus.GaugeVec + optimizerTierChance *prometheus.GaugeVec + + // refactored optimizer metrics + optimizerRefactorProviderScore *prometheus.GaugeVec + optimizerRefactorProviderLatency *prometheus.GaugeVec + optimizerRefactorProviderSync *prometheus.GaugeVec + optimizerRefactorProviderAvailability *prometheus.GaugeVec + optimizerRefactorProviderTier *prometheus.GaugeVec + optimizerRefactorTierChance *prometheus.GaugeVec } type ConsumerMetricsManagerOptions struct { @@ -210,6 +227,68 @@ func NewConsumerMetricsManager(options ConsumerMetricsManagerOptions) *ConsumerM Help: "average latency of processing a successful relay after it is received from the provider in µs (10^6)", }, []string{"spec", "apiInterface"}) + optimizerProviderScore := prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Name: "lava_consumer_optimizer_provider_score", + Help: "[Optimizer] The total score of a provider", + }, []string{"spec", "api_interface", "provider_address", "epoch"}) + + optimizerProviderLatency := prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Name: "lava_consumer_optimizer_provider_latency", + Help: "[Optimizer] The latency of a provider", + }, []string{"spec", "api_interface", "provider_address", "epoch"}) + + optimizerProviderSync := prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Name: "lava_consumer_optimizer_provider_sync", + Help: "[Optimizer] The sync of a provider", + }, []string{"spec", "api_interface", "provider_address", "epoch"}) + + optimizerProviderAvailability := prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Name: "lava_consumer_optimizer_provider_availability", + Help: "[Optimizer] The availability of a provider", + }, []string{"spec", "api_interface", "provider_address", "epoch"}) + + optimizerProviderTier := prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Name: "lava_consumer_optimizer_provider_tier", + Help: "[Optimizer] The tier of a provider", + }, []string{"spec", "api_interface", "provider_address", "epoch"}) + + optimizerTierChance := prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Name: "lava_consumer_optimizer_provider_tiers_chances", + Help: "[Optimizer] The chances of a tier being selected by the optimizer", + }, []string{"spec", "api_interface", "tier", "epoch"}) + + // + + optimizerRefactorProviderScore := prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Name: "lava_consumer_optimizer_refactor_provider_score", + Help: "[Optimizer Refactor] The total score of a provider", + }, []string{"spec", "api_interface", "provider_address", "epoch"}) + + optimizerRefactorProviderLatency := prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Name: "lava_consumer_optimizer_refactor_provider_latency", + Help: "[Optimizer Refactor] The latency of a provider", + }, []string{"spec", "api_interface", "provider_address", "epoch"}) + + optimizerRefactorProviderSync := prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Name: "lava_consumer_optimizer_refactor_provider_sync", + Help: "[Optimizer Refactor] The sync of a provider", + }, []string{"spec", "api_interface", "provider_address", "epoch"}) + + optimizerRefactorProviderAvailability := prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Name: "lava_consumer_optimizer_refactor_provider_availability", + Help: "[Optimizer Refactor] The availability of a provider", + }, []string{"spec", "api_interface", "provider_address", "epoch"}) + + optimizerRefactorProviderTier := prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Name: "lava_consumer_optimizer_refactor_provider_tier", + Help: "[Optimizer Refactor] The tier of a provider", + }, []string{"spec", "api_interface", "provider_address", "epoch"}) + + optimizerRefactorTierChance := prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Name: "lava_consumer_optimizer_refactor_provider_tiers_chances", + Help: "[Optimizer Refactor] The chances of a tier being selected by the optimizer", + }, []string{"spec", "api_interface", "tier", "epoch"}) + // Register the metrics with the Prometheus registry. prometheus.MustRegister(totalCURequestedMetric) prometheus.MustRegister(totalRelaysRequestedMetric) @@ -237,6 +316,18 @@ func NewConsumerMetricsManager(options ConsumerMetricsManagerOptions) *ConsumerM prometheus.MustRegister(totalFailedWsSubscriptionRequestsMetric) prometheus.MustRegister(totalDuplicatedWsSubscriptionRequestsMetric) prometheus.MustRegister(totalWsSubscriptionDissconnectMetric) + prometheus.MustRegister(optimizerProviderScore) + prometheus.MustRegister(optimizerProviderLatency) + prometheus.MustRegister(optimizerProviderSync) + prometheus.MustRegister(optimizerProviderAvailability) + prometheus.MustRegister(optimizerProviderTier) + prometheus.MustRegister(optimizerTierChance) + prometheus.MustRegister(optimizerRefactorProviderScore) + prometheus.MustRegister(optimizerRefactorProviderLatency) + prometheus.MustRegister(optimizerRefactorProviderSync) + prometheus.MustRegister(optimizerRefactorProviderAvailability) + prometheus.MustRegister(optimizerRefactorProviderTier) + prometheus.MustRegister(optimizerRefactorTierChance) consumerMetricsManager := &ConsumerMetricsManager{ totalCURequestedMetric: totalCURequestedMetric, @@ -270,6 +361,20 @@ func NewConsumerMetricsManager(options ConsumerMetricsManagerOptions) *ConsumerM relayProcessingLatencyBeforeProvider: relayProcessingLatencyBeforeProvider, relayProcessingLatencyAfterProvider: relayProcessingLatencyAfterProvider, averageProcessingLatency: map[string]*LatencyTracker{}, + + optimizerProviderScore: optimizerProviderScore, + optimizerProviderLatency: optimizerProviderLatency, + optimizerProviderSync: optimizerProviderSync, + optimizerProviderAvailability: optimizerProviderAvailability, + optimizerProviderTier: optimizerProviderTier, + optimizerTierChance: optimizerTierChance, + + optimizerRefactorProviderScore: optimizerRefactorProviderScore, + optimizerRefactorProviderLatency: optimizerRefactorProviderLatency, + optimizerRefactorProviderSync: optimizerRefactorProviderSync, + optimizerRefactorProviderAvailability: optimizerRefactorProviderAvailability, + optimizerRefactorProviderTier: optimizerRefactorProviderTier, + optimizerRefactorTierChance: optimizerRefactorTierChance, } http.Handle("/metrics", promhttp.Handler()) @@ -545,3 +650,60 @@ func (pme *ConsumerMetricsManager) SetWsSubscriptioDisconnectRequestMetric(chain } pme.totalWsSubscriptionDissconnectMetric.WithLabelValues(chainId, apiInterface, disconnectReason).Inc() } + +func (pme *ConsumerMetricsManager) SetOptimizerProviderScoreMetric(chainId string, apiInterface string, providerAddress string, epoch uint64, scoreType string, refactored bool, score float64) { + if pme == nil { + return + } + + switch scoreType { + case scoreutils.LatencyScoreType_Refactor: + if refactored { + pme.optimizerRefactorProviderLatency.WithLabelValues(chainId, apiInterface, providerAddress, fmt.Sprintf("%d", epoch)).Set(score) + } else { + pme.optimizerProviderLatency.WithLabelValues(chainId, apiInterface, providerAddress, fmt.Sprintf("%d", epoch)).Set(score) + } + case scoreutils.SyncScoreType_Refactor: + if refactored { + pme.optimizerRefactorProviderSync.WithLabelValues(chainId, apiInterface, providerAddress, fmt.Sprintf("%d", epoch)).Set(score) + } else { + pme.optimizerProviderSync.WithLabelValues(chainId, apiInterface, providerAddress, fmt.Sprintf("%d", epoch)).Set(score) + } + case scoreutils.AvailabilityScoreType_Refactor: + if refactored { + pme.optimizerRefactorProviderAvailability.WithLabelValues(chainId, apiInterface, providerAddress, fmt.Sprintf("%d", epoch)).Set(score) + } else { + pme.optimizerProviderAvailability.WithLabelValues(chainId, apiInterface, providerAddress, fmt.Sprintf("%d", epoch)).Set(score) + } + case scoreutils.TotalScoreType_Refactor: + if refactored { + pme.optimizerRefactorProviderScore.WithLabelValues(chainId, apiInterface, providerAddress, fmt.Sprintf("%d", epoch)).Set(score) + } else { + pme.optimizerProviderScore.WithLabelValues(chainId, apiInterface, providerAddress, fmt.Sprintf("%d", epoch)).Set(score) + } + default: + utils.LavaFormatError("Unknown score type", nil, utils.Attribute{Key: "scoreType", Value: scoreType}) + } +} + +func (pme *ConsumerMetricsManager) SetOptimizerProviderTierMetric(chainId string, apiInterface string, providerAddress string, epoch uint64, refactored bool, tier int) { + if pme == nil { + return + } + if refactored { + pme.optimizerRefactorProviderTier.WithLabelValues(chainId, apiInterface, providerAddress, fmt.Sprintf("%d", epoch)).Set(float64(tier)) + } else { + pme.optimizerProviderTier.WithLabelValues(chainId, apiInterface, providerAddress, fmt.Sprintf("%d", epoch)).Set(float64(tier)) + } +} + +func (pme *ConsumerMetricsManager) SetOptimizerTierChanceMetric(chainId string, apiInterface string, tier int, epoch uint64, refactored bool, chance float64) { + if pme == nil { + return + } + if refactored { + pme.optimizerRefactorTierChance.WithLabelValues(chainId, apiInterface, fmt.Sprintf("%d", tier), fmt.Sprintf("%d", epoch)).Set(chance) + } else { + pme.optimizerTierChance.WithLabelValues(chainId, apiInterface, fmt.Sprintf("%d", tier), fmt.Sprintf("%d", epoch)).Set(chance) + } +} diff --git a/utils/score/score_store.go b/utils/score/score_store.go index aec8193c89..024931357f 100644 --- a/utils/score/score_store.go +++ b/utils/score/score_store.go @@ -299,6 +299,7 @@ const ( LatencyScoreType_Refactor = "latency" SyncScoreType_Refactor = "sync" AvailabilityScoreType_Refactor = "availability" + TotalScoreType_Refactor = "total" // Worst score results for each QoS excellence metric for truncation WorstLatencyScore_Refactor float64 = 30 // seconds From 6d959f8b1d371308a703c6138b3839d818667238 Mon Sep 17 00:00:00 2001 From: Oren Date: Mon, 25 Nov 2024 19:29:20 +0200 Subject: [PATCH 2/8] update optimizer refactor to use metrics and use Strategy instead of Strategy_refactor --- .../provideroptimizer/provider_optimizer.go | 37 ++++++-- .../provider_optimizer_refactor.go | 95 ++++++++----------- .../provider_optimizer_refactor_test.go | 40 ++++---- 3 files changed, 92 insertions(+), 80 deletions(-) diff --git a/protocol/provideroptimizer/provider_optimizer.go b/protocol/provideroptimizer/provider_optimizer.go index 6f9198a9e9..bc216cfd38 100644 --- a/protocol/provideroptimizer/provider_optimizer.go +++ b/protocol/provideroptimizer/provider_optimizer.go @@ -80,18 +80,41 @@ type ProviderData struct { SyncRaw score.ScoreStore // will be used when reporting reputation to the node (Sync = SyncRaw / baseSync) } +// Strategy defines the pairing strategy. Using different strategies allow users to determine +// the providers type they'll be paired with: providers with low latency, fresh sync and more. type Strategy int const ( - STRATEGY_BALANCED Strategy = iota - STRATEGY_LATENCY - STRATEGY_SYNC_FRESHNESS - STRATEGY_COST - STRATEGY_PRIVACY - STRATEGY_ACCURACY - STRATEGY_DISTRIBUTED + STRATEGY_BALANCED Strategy = iota + STRATEGY_LATENCY // prefer low latency + STRATEGY_SYNC_FRESHNESS // prefer better sync + STRATEGY_COST // prefer low CU cost (minimize optimizer exploration) + STRATEGY_PRIVACY // prefer pairing with a single provider (not fully implemented) + STRATEGY_ACCURACY // encourage optimizer exploration (higher cost) + STRATEGY_DISTRIBUTED // prefer pairing with different providers (slightly minimize optimizer exploration) ) +func (s Strategy) String() string { + switch s { + case STRATEGY_BALANCED: + return "balanced" + case STRATEGY_LATENCY: + return "latency" + case STRATEGY_SYNC_FRESHNESS: + return "sync_freshness" + case STRATEGY_COST: + return "cost" + case STRATEGY_PRIVACY: + return "privacy" + case STRATEGY_ACCURACY: + return "accuracy" + case STRATEGY_DISTRIBUTED: + return "distributed" + } + + return "" +} + func (po *ProviderOptimizer) UpdateWeights(weights map[string]int64, epoch uint64) { po.selectionWeighter.SetWeights(weights) diff --git a/protocol/provideroptimizer/provider_optimizer_refactor.go b/protocol/provideroptimizer/provider_optimizer_refactor.go index f6c603906d..48f25e8295 100644 --- a/protocol/provideroptimizer/provider_optimizer_refactor.go +++ b/protocol/provideroptimizer/provider_optimizer_refactor.go @@ -9,6 +9,7 @@ import ( "cosmossdk.io/math" sdk "github.com/cosmos/cosmos-sdk/types" "github.com/dgraph-io/ristretto" + "github.com/lavanet/lava/v4/protocol/metrics" "github.com/lavanet/lava/v4/utils" "github.com/lavanet/lava/v4/utils/lavaslices" "github.com/lavanet/lava/v4/utils/rand" @@ -49,7 +50,7 @@ type cacheInf_Refactor interface { } type ProviderOptimizer_Refactor struct { - strategy Strategy_Refactor + strategy Strategy providersStorage cacheInf_Refactor providerRelayStats *ristretto.Cache // used to decide on the half time of the decay averageBlockTime time.Duration @@ -76,49 +77,13 @@ type ProviderData_Refactor struct { SyncBlock uint64 // will be used to calculate the probability of block error } -// Strategy_Refactor defines the pairing strategy. Using different -// strategies allow users to determine the providers type they'll -// be paired with: providers with low latency, fresh sync and more. -type Strategy_Refactor int - -const ( - StrategyBalanced_Refactor Strategy_Refactor = iota - StrategyLatency_Refactor // prefer low latency - StrategySyncFreshness_Refactor // prefer better sync - StrategyCost_Refactor // prefer low CU cost (minimize optimizer exploration) - StrategyPrivacy_Refactor // prefer pairing with a single provider (not fully implemented) - StrategyAccuracy_Refactor // encourage optimizer exploration (higher cost) - StrategyDistributed_Refactor // prefer pairing with different providers (slightly minimize optimizer exploration) -) - -func (s Strategy_Refactor) String() string { - switch s { - case StrategyBalanced_Refactor: - return "balanced" - case StrategyLatency_Refactor: - return "latency" - case StrategySyncFreshness_Refactor: - return "sync_freshness" - case StrategyCost_Refactor: - return "cost" - case StrategyPrivacy_Refactor: - return "privacy" - case StrategyAccuracy_Refactor: - return "accuracy" - case StrategyDistributed_Refactor: - return "distributed" - } - - return "" -} - // GetStrategyFactor gets the appropriate factor to multiply the sync factor // with according to the strategy -func (s Strategy_Refactor) GetStrategyFactor() math.LegacyDec { +func (s Strategy) GetStrategyFactor() math.LegacyDec { switch s { - case StrategyLatency_Refactor: + case STRATEGY_LATENCY: return pairingtypes.LatencyStrategyFactor - case StrategySyncFreshness_Refactor: + case STRATEGY_SYNC_FRESHNESS: return pairingtypes.SyncFreshnessStrategyFactor } @@ -199,9 +164,24 @@ func (po *ProviderOptimizer_Refactor) AppendProbeRelayData_Refactor(providerAddr ) } -func (po *ProviderOptimizer_Refactor) CalculateSelectionTiers_Refactor(allAddresses []string, ignoredProviders map[string]struct{}, cu uint64, requestedBlock int64) (SelectionTier, Exploration_Refactor) { +func (po *ProviderOptimizer_Refactor) CalculateQoSScoresForMetrics(allAddresses []string, ignoredProviders map[string]struct{}, cu uint64, requestedBlock int64) []*metrics.OptimizerQoSReport { + selectionTier, _, providersScores := po.CalculateSelectionTiers_Refactor(allAddresses, ignoredProviders, cu, requestedBlock) + reports := []*metrics.OptimizerQoSReport{} + + rawScores := selectionTier.GetRawScores() + for idx, entry := range rawScores { + qosReport := providersScores[entry.Address] + qosReport.EntryIndex = idx + reports = append(reports, qosReport) + } + + return reports +} + +func (po *ProviderOptimizer_Refactor) CalculateSelectionTiers_Refactor(allAddresses []string, ignoredProviders map[string]struct{}, cu uint64, requestedBlock int64) (SelectionTier, Exploration_Refactor, map[string]*metrics.OptimizerQoSReport) { explorationCandidate := Exploration_Refactor{address: "", time: time.Now().Add(time.Hour)} selectionTier := NewSelectionTier() + providerScores := make(map[string]*metrics.OptimizerQoSReport) for _, providerAddress := range allAddresses { if _, ok := ignoredProviders[providerAddress]; ok { // ignored provider, skip it @@ -219,7 +199,7 @@ func (po *ProviderOptimizer_Refactor) CalculateSelectionTiers_Refactor(allAddres fmt.Errorf("could not get QoS excellece report for provider"), utils.LogAttr("provider", providerAddress), ) - return NewSelectionTier(), Exploration_Refactor{} + return NewSelectionTier(), Exploration_Refactor{}, providerScores } utils.LavaFormatTrace("[Optimizer] scores information", @@ -240,7 +220,7 @@ func (po *ProviderOptimizer_Refactor) CalculateSelectionTiers_Refactor(allAddres utils.LogAttr("provider", providerAddress), utils.LogAttr("requested_block", requestedBlock), ) - return NewSelectionTier(), Exploration_Refactor{} + return NewSelectionTier(), Exploration_Refactor{}, providerScores } score, err := qos.ComputeQoSExcellence_Refactor(opts...) if err != nil { @@ -248,8 +228,17 @@ func (po *ProviderOptimizer_Refactor) CalculateSelectionTiers_Refactor(allAddres utils.LogAttr("provider", providerAddress), utils.LogAttr("qos_report", qos.String()), ) - return NewSelectionTier(), Exploration_Refactor{} + return NewSelectionTier(), Exploration_Refactor{}, providerScores } + + providerScores[providerAddress] = &metrics.OptimizerQoSReport{ + ProviderAddress: providerAddress, + SyncScore: qos.Sync.MustFloat64(), + LatencyScore: qos.Latency.MustFloat64(), + AvailabilityScore: qos.Availability.MustFloat64(), + GenericScore: score.MustFloat64(), + } + selectionTier.AddScore(providerAddress, score.MustFloat64()) // check if candidate for exploration @@ -258,12 +247,12 @@ func (po *ProviderOptimizer_Refactor) CalculateSelectionTiers_Refactor(allAddres explorationCandidate = Exploration_Refactor{address: providerAddress, time: lastUpdateTime} } } - return selectionTier, explorationCandidate + return selectionTier, explorationCandidate, providerScores } // returns a sub set of selected providers according to their scores, perturbation factor will be added to each score in order to randomly select providers that are not always on top func (po *ProviderOptimizer_Refactor) ChooseProvider_Refactor(allAddresses []string, ignoredProviders map[string]struct{}, cu uint64, requestedBlock int64) (addresses []string, tier int) { - selectionTier, explorationCandidate := po.CalculateSelectionTiers_Refactor(allAddresses, ignoredProviders, cu, requestedBlock) + selectionTier, explorationCandidate, _ := po.CalculateSelectionTiers_Refactor(allAddresses, ignoredProviders, cu, requestedBlock) if selectionTier.ScoresCount() == 0 { // no providers to choose from return []string{}, -1 @@ -358,15 +347,15 @@ func (po *ProviderOptimizer_Refactor) shouldExplore_Refactor(currentNumProviders } explorationChance := DefaultExplorationChance_Refactor switch po.strategy { - case StrategyLatency_Refactor: + case STRATEGY_LATENCY: return true // we want a lot of parallel tries on latency - case StrategyAccuracy_Refactor: + case STRATEGY_ACCURACY: return true - case StrategyCost_Refactor: + case STRATEGY_COST: explorationChance = CostExplorationChance_Refactor - case StrategyDistributed_Refactor: + case STRATEGY_DISTRIBUTED: explorationChance = DefaultExplorationChance_Refactor * 0.25 - case StrategyPrivacy_Refactor: + case STRATEGY_PRIVACY: return false // only one at a time } return rand.Float64() < explorationChance @@ -495,7 +484,7 @@ func (po *ProviderOptimizer_Refactor) getRelayStatsTimes_Refactor(providerAddres return nil } -func NewProviderOptimizer_Refactor(strategy Strategy_Refactor, averageBlockTIme time.Duration, wantedNumProvidersInConcurrency uint) *ProviderOptimizer_Refactor { +func NewProviderOptimizer_Refactor(strategy Strategy, averageBlockTIme time.Duration, wantedNumProvidersInConcurrency uint) *ProviderOptimizer_Refactor { cache, err := ristretto.NewCache(&ristretto.Config{NumCounters: CacheNumCounters, MaxCost: CacheMaxCost, BufferItems: 64, IgnoreInternalCost: true}) if err != nil { utils.LavaFormatFatal("failed setting up cache for queries", err) @@ -504,7 +493,7 @@ func NewProviderOptimizer_Refactor(strategy Strategy_Refactor, averageBlockTIme if err != nil { utils.LavaFormatFatal("failed setting up cache for queries", err) } - if strategy == StrategyPrivacy_Refactor { + if strategy == STRATEGY_PRIVACY { // overwrite wantedNumProvidersInConcurrency = 1 } diff --git a/protocol/provideroptimizer/provider_optimizer_refactor_test.go b/protocol/provideroptimizer/provider_optimizer_refactor_test.go index 505cd3fdc4..99c78b6a00 100644 --- a/protocol/provideroptimizer/provider_optimizer_refactor_test.go +++ b/protocol/provideroptimizer/provider_optimizer_refactor_test.go @@ -20,7 +20,7 @@ const ( func setupProviderOptimizer_Refactor(maxProvidersCount uint) *ProviderOptimizer_Refactor { averageBlockTIme := TEST_AVERAGE_BLOCK_TIME_Refactor - return NewProviderOptimizer_Refactor(StrategyBalanced_Refactor, averageBlockTIme, maxProvidersCount) + return NewProviderOptimizer_Refactor(STRATEGY_BALANCED, averageBlockTIme, maxProvidersCount) } type providersGenerator_Refactor struct { @@ -156,7 +156,7 @@ func TestProviderOptimizerBasicRelayData_Refactor(t *testing.T) { // there's a chance that some of the worst providers will be in part of a higher tier // because of a high minimum entries value, so filter the providers that are only in the worst tier - selectionTier, _ := providerOptimizer.CalculateSelectionTiers_Refactor(providersGen.providersAddresses, nil, cu, requestBlock) + selectionTier, _, _ := providerOptimizer.CalculateSelectionTiers_Refactor(providersGen.providersAddresses, nil, cu, requestBlock) tier3Entries := selectionTier.GetTier(3, providerOptimizer.OptimizerNumTiers, 1) tier2Entries := selectionTier.GetTier(2, providerOptimizer.OptimizerNumTiers, 1) worstTierEntries := map[string]struct{}{} @@ -302,7 +302,7 @@ func TestProviderOptimizerAvailabilityBlockError_Refactor(t *testing.T) { // make the top tier chance to be 70% time.Sleep(4 * time.Millisecond) - selectionTier, _ := providerOptimizer.CalculateSelectionTiers_Refactor(providersGen.providersAddresses, nil, cu, requestBlock) + selectionTier, _, _ := providerOptimizer.CalculateSelectionTiers_Refactor(providersGen.providersAddresses, nil, cu, requestBlock) tierChances := selectionTier.ShiftTierChance(OptimizerNumTiers, map[int]float64{0: ATierChance, OptimizerNumTiers - 1: LastTierChance}) require.Greater(t, tierChances[0], 0.7, tierChances) @@ -433,21 +433,21 @@ func TestProviderOptimizerExploration_Refactor(t *testing.T) { } // with a cost strategy we expect exploration at a 10% rate - providerOptimizer.strategy = StrategyBalanced_Refactor // that's the default but to be explicit - providerOptimizer.wantedNumProvidersInConcurrency = 2 // that's in the constructor but to be explicit + providerOptimizer.strategy = STRATEGY_BALANCED // that's the default but to be explicit + providerOptimizer.wantedNumProvidersInConcurrency = 2 // that's in the constructor but to be explicit iterations := 10000 exploration = testProvidersExploration(iterations) require.Less(t, exploration, float64(1.4)*float64(iterations)*DefaultExplorationChance_Refactor) // allow mistake buffer of 40% because of randomness require.Greater(t, exploration, float64(0.6)*float64(iterations)*DefaultExplorationChance_Refactor) // allow mistake buffer of 40% because of randomness // with a cost strategy we expect exploration to happen once in 100 samples - providerOptimizer.strategy = StrategyCost_Refactor + providerOptimizer.strategy = STRATEGY_COST exploration = testProvidersExploration(iterations) require.Less(t, exploration, float64(1.4)*float64(iterations)*CostExplorationChance_Refactor) // allow mistake buffer of 40% because of randomness require.Greater(t, exploration, float64(0.6)*float64(iterations)*CostExplorationChance_Refactor) // allow mistake buffer of 40% because of randomness // privacy disables exploration - providerOptimizer.strategy = StrategyPrivacy_Refactor + providerOptimizer.strategy = STRATEGY_PRIVACY exploration = testProvidersExploration(iterations) require.Equal(t, exploration, float64(0)) } @@ -476,14 +476,14 @@ func TestProviderOptimizerSyncScore_Refactor(t *testing.T) { sampleTime = sampleTime.Add(time.Millisecond * 5) } time.Sleep(4 * time.Millisecond) - selectionTier, _ := providerOptimizer.CalculateSelectionTiers_Refactor(providersGen.providersAddresses, nil, cu, requestBlock) + selectionTier, _, _ := providerOptimizer.CalculateSelectionTiers_Refactor(providersGen.providersAddresses, nil, cu, requestBlock) tier0 := selectionTier.GetTier(0, 4, 3) require.Greater(t, len(tier0), 0) // shouldn't be empty // we have the best score on the top tier and it's sorted require.Equal(t, providersGen.providersAddresses[chosenIndex], tier0[0].Address) // now choose with a specific block that all providers have - selectionTier, _ = providerOptimizer.CalculateSelectionTiers_Refactor(providersGen.providersAddresses, nil, cu, int64(syncBlock)) + selectionTier, _, _ = providerOptimizer.CalculateSelectionTiers_Refactor(providersGen.providersAddresses, nil, cu, int64(syncBlock)) tier0 = selectionTier.GetTier(0, 4, 3) for idx := range tier0 { // sync score doesn't matter now so the tier0 is recalculated and chosenIndex has worst latency @@ -546,38 +546,38 @@ func TestProviderOptimizerStrategiesScoring_Refactor(t *testing.T) { providerOptimizer.appendRelayData_Refactor(providersGen.providersAddresses[1], normalLatency, true, cu, improvedBlock, sampleTime) time.Sleep(4 * time.Millisecond) - providerOptimizer.strategy = StrategyBalanced_Refactor + providerOptimizer.strategy = STRATEGY_BALANCED // a balanced strategy should pick provider 2 because of it's high availability - selectionTier, _ := providerOptimizer.CalculateSelectionTiers_Refactor(providersGen.providersAddresses, nil, cu, requestBlock) + selectionTier, _, _ := providerOptimizer.CalculateSelectionTiers_Refactor(providersGen.providersAddresses, nil, cu, requestBlock) tier0 := selectionTier.GetTier(0, 4, 3) require.Greater(t, len(tier0), 0) // shouldn't be empty // we have the best score on the top tier and it's sorted require.Equal(t, providersGen.providersAddresses[2], tier0[0].Address) - providerOptimizer.strategy = StrategyCost_Refactor + providerOptimizer.strategy = STRATEGY_COST // with a cost strategy we expect the same as balanced - selectionTier, _ = providerOptimizer.CalculateSelectionTiers_Refactor(providersGen.providersAddresses, nil, cu, requestBlock) + selectionTier, _, _ = providerOptimizer.CalculateSelectionTiers_Refactor(providersGen.providersAddresses, nil, cu, requestBlock) tier0 = selectionTier.GetTier(0, 4, 3) require.Greater(t, len(tier0), 0) // shouldn't be empty // we have the best score on the top tier and it's sorted require.Equal(t, providersGen.providersAddresses[2], tier0[0].Address) - providerOptimizer.strategy = StrategyLatency_Refactor + providerOptimizer.strategy = STRATEGY_LATENCY // latency strategy should pick the best latency - selectionTier, _ = providerOptimizer.CalculateSelectionTiers_Refactor(providersGen.providersAddresses, map[string]struct{}{providersGen.providersAddresses[2]: {}}, cu, requestBlock) + selectionTier, _, _ = providerOptimizer.CalculateSelectionTiers_Refactor(providersGen.providersAddresses, map[string]struct{}{providersGen.providersAddresses[2]: {}}, cu, requestBlock) tier0 = selectionTier.GetTier(0, 4, 3) require.Greater(t, len(tier0), 0) // shouldn't be empty require.Equal(t, providersGen.providersAddresses[0], tier0[0].Address) - providerOptimizer.strategy = StrategySyncFreshness_Refactor + providerOptimizer.strategy = STRATEGY_SYNC_FRESHNESS // freshness strategy should pick the most advanced provider - selectionTier, _ = providerOptimizer.CalculateSelectionTiers_Refactor(providersGen.providersAddresses, map[string]struct{}{providersGen.providersAddresses[2]: {}}, cu, requestBlock) + selectionTier, _, _ = providerOptimizer.CalculateSelectionTiers_Refactor(providersGen.providersAddresses, map[string]struct{}{providersGen.providersAddresses[2]: {}}, cu, requestBlock) tier0 = selectionTier.GetTier(0, 4, 3) require.Greater(t, len(tier0), 0) // shouldn't be empty require.Equal(t, providersGen.providersAddresses[1], tier0[0].Address) // but if we request a past block, then it doesnt matter and we choose by latency: - selectionTier, _ = providerOptimizer.CalculateSelectionTiers_Refactor(providersGen.providersAddresses, map[string]struct{}{providersGen.providersAddresses[2]: {}}, cu, int64(syncBlock)) + selectionTier, _, _ = providerOptimizer.CalculateSelectionTiers_Refactor(providersGen.providersAddresses, map[string]struct{}{providersGen.providersAddresses[2]: {}}, cu, int64(syncBlock)) tier0 = selectionTier.GetTier(0, 4, 3) require.Greater(t, len(tier0), 0) // shouldn't be empty require.Equal(t, providersGen.providersAddresses[0], tier0[0].Address) @@ -682,7 +682,7 @@ func TestProviderOptimizerWeights_Refactor(t *testing.T) { } // verify 0 has the best score - selectionTier, _ := providerOptimizer.CalculateSelectionTiers_Refactor(providersGen.providersAddresses, nil, cu, requestBlock) + selectionTier, _, _ := providerOptimizer.CalculateSelectionTiers_Refactor(providersGen.providersAddresses, nil, cu, requestBlock) tier0 := selectionTier.GetTier(0, 4, 3) require.Greater(t, len(tier0), 0) // shouldn't be empty require.Equal(t, providersGen.providersAddresses[0], tier0[0].Address) @@ -720,7 +720,7 @@ func TestProviderOptimizerTiers_Refactor(t *testing.T) { time.Sleep(4 * time.Millisecond) } } - selectionTier, _ := providerOptimizer.CalculateSelectionTiers_Refactor(providersGen.providersAddresses, nil, cu, requestBlock) + selectionTier, _, _ := providerOptimizer.CalculateSelectionTiers_Refactor(providersGen.providersAddresses, nil, cu, requestBlock) shiftedChances := selectionTier.ShiftTierChance(4, map[int]float64{0: 0.75}) require.NotZero(t, shiftedChances[3]) // if we pick by sync, provider 0 is in the top tier and should be selected very often From 6252272a6519dbc8389e3aa4ac0b21e68ea70953 Mon Sep 17 00:00:00 2001 From: Oren Date: Tue, 26 Nov 2024 14:59:45 +0200 Subject: [PATCH 3/8] add metrics for refactored optimizer --- protocol/common/cobra_common.go | 7 ++- .../metrics/consumer_optimizer_qos_client.go | 61 +++++++++++++++++-- protocol/rpcconsumer/rpcconsumer.go | 42 +++++++++---- 3 files changed, 90 insertions(+), 20 deletions(-) diff --git a/protocol/common/cobra_common.go b/protocol/common/cobra_common.go index 338b003f67..880acb4ae1 100644 --- a/protocol/common/cobra_common.go +++ b/protocol/common/cobra_common.go @@ -40,9 +40,10 @@ const ( SetProviderOptimizerNumberOfTiersToCreate = "set-provider-optimizer-number-of-tiers-to-create" // optimizer qos server flags - OptimizerQosServerAddressFlag = "optimizer-qos-server-address" // address of the optimizer qos server to send the qos reports - OptimizerQosServerPushIntervalFlag = "optimizer-qos-push-interval" // interval to push the qos reports to the optimizer qos server - OptimizerQosServerSamplingIntervalFlag = "optimizer-qos-sampling-interval" // interval to sample the qos reports + OptimizerQosServerAddressFlag = "optimizer-qos-server-address" // address of the optimizer qos server to send the qos reports + OptimizerQosServerAddressRefactorFlag = "optimizer-qos-server-address-refactor" // address of the refactored optimizer qos server to send the qos reports + OptimizerQosServerPushIntervalFlag = "optimizer-qos-push-interval" // interval to push the qos reports to the optimizer qos server + OptimizerQosServerSamplingIntervalFlag = "optimizer-qos-sampling-interval" // interval to sample the qos reports // websocket flags RateLimitWebSocketFlag = "rate-limit-websocket-requests-per-connection" BanDurationForWebsocketRateLimitExceededFlag = "ban-duration-for-websocket-rate-limit-exceeded" diff --git a/protocol/metrics/consumer_optimizer_qos_client.go b/protocol/metrics/consumer_optimizer_qos_client.go index f204107e17..10f84391b0 100644 --- a/protocol/metrics/consumer_optimizer_qos_client.go +++ b/protocol/metrics/consumer_optimizer_qos_client.go @@ -21,9 +21,11 @@ var ( ) type ConsumerOptimizerQoSClient struct { - consumerOrigin string - queueSender *QueueSender - optimizers map[string]OptimizerInf // keys are chain ids + consumerOrigin string + queueSender *QueueSender + queueSenderRefactored *QueueSender + optimizers map[string]OptimizerInf // keys are chain ids + optimizersRefactored map[string]OptimizerInf // keys are chain ids // keys are chain ids, values are maps with provider addresses as keys chainIdToProviderToRelaysCount map[string]map[string]uint64 chainIdToProviderToNodeErrorsCount map[string]map[string]uint64 @@ -68,7 +70,7 @@ type OptimizerInf interface { CalculateQoSScoresForMetrics(allAddresses []string, ignoredProviders map[string]struct{}, cu uint64, requestedBlock int64) []*OptimizerQoSReport } -func NewConsumerOptimizerQoSClient(endpointAddress string, interval ...time.Duration) *ConsumerOptimizerQoSClient { +func NewConsumerOptimizerQoSClient(endpointAddress string, endpointAddressRefactor string, interval ...time.Duration) *ConsumerOptimizerQoSClient { hostname, err := os.Hostname() if err != nil { utils.LavaFormatWarning("Error while getting hostname for ConsumerOptimizerQoSClient", err) @@ -78,7 +80,9 @@ func NewConsumerOptimizerQoSClient(endpointAddress string, interval ...time.Dura return &ConsumerOptimizerQoSClient{ consumerOrigin: hostname, queueSender: NewQueueSender(endpointAddress, "ConsumerOptimizerQoS", nil, interval...), + queueSenderRefactored: NewQueueSender(endpointAddressRefactor, "ConsumerOptimizerQoSRefactored", nil, interval...), optimizers: map[string]OptimizerInf{}, + optimizersRefactored: map[string]OptimizerInf{}, chainIdToProviderToRelaysCount: map[string]map[string]uint64{}, chainIdToProviderToNodeErrorsCount: map[string]map[string]uint64{}, chainIdToProviderToEpochToStake: map[string]map[string]map[uint64]int64{}, @@ -147,6 +151,27 @@ func (coqc *ConsumerOptimizerQoSClient) appendOptimizerQoSReport(report *Optimiz coqc.queueSender.appendQueue(optimizerQoSReportToSend) } +func (coqc *ConsumerOptimizerQoSClient) appendOptimizerQoSReportRefactored(report *OptimizerQoSReport, chainId string, epoch uint64) { + // must be called under read lock + + optimizerQoSReportToSend := optimizerQoSReportToSend{ + Timestamp: time.Now(), + ConsumerOrigin: coqc.consumerOrigin, + SyncScore: report.SyncScore, + AvailabilityScore: report.AvailabilityScore, + LatencyScore: report.LatencyScore, + GenericScore: report.GenericScore, + ProviderAddress: report.ProviderAddress, + EntryIndex: report.EntryIndex, + ChainId: chainId, + Epoch: epoch, + NodeErrorRate: coqc.calculateNodeErrorRate(chainId, report.ProviderAddress), + ProviderStake: coqc.getProviderChainStake(chainId, report.ProviderAddress, epoch), + } + + coqc.queueSenderRefactored.appendQueue(optimizerQoSReportToSend) +} + func (coqc *ConsumerOptimizerQoSClient) getReportsFromOptimizers() { coqc.lock.RLock() // we only read from the maps here defer coqc.lock.RUnlock() @@ -168,6 +193,18 @@ func (coqc *ConsumerOptimizerQoSClient) getReportsFromOptimizers() { coqc.appendOptimizerQoSReport(report, chainId, currentEpoch) } } + + for chainId, optimizer := range coqc.optimizersRefactored { + providersMap, ok := coqc.chainIdToProviderToEpochToStake[chainId] + if !ok { + continue + } + + reports := optimizer.CalculateQoSScoresForMetrics(maps.Keys(providersMap), ignoredProviders, cu, requestedBlock) + for _, report := range reports { + coqc.appendOptimizerQoSReportRefactored(report, chainId, currentEpoch) + } + } } func (coqc *ConsumerOptimizerQoSClient) StartOptimizersQoSReportsCollecting(ctx context.Context, samplingInterval time.Duration) { @@ -205,6 +242,22 @@ func (coqc *ConsumerOptimizerQoSClient) RegisterOptimizer(optimizer OptimizerInf coqc.optimizers[chainId] = optimizer } +func (coqc *ConsumerOptimizerQoSClient) RegisterOptimizerRefactored(optimizer OptimizerInf, chainId string) { + if coqc == nil { + return + } + + coqc.lock.Lock() + defer coqc.lock.Unlock() + + if _, found := coqc.optimizersRefactored[chainId]; found { + utils.LavaFormatWarning("Optimizer refactored already registered for chain", nil, utils.LogAttr("chainId", chainId)) + return + } + + coqc.optimizersRefactored[chainId] = optimizer +} + func (coqc *ConsumerOptimizerQoSClient) incrementStoreCounter(store map[string]map[string]uint64, chainId, providerAddress string) { // must be called under write lock if coqc == nil { diff --git a/protocol/rpcconsumer/rpcconsumer.go b/protocol/rpcconsumer/rpcconsumer.go index bfd554925e..736c851532 100644 --- a/protocol/rpcconsumer/rpcconsumer.go +++ b/protocol/rpcconsumer/rpcconsumer.go @@ -101,11 +101,12 @@ type ConsumerStateTrackerInf interface { } type AnalyticsServerAddresses struct { - AddApiMethodCallsMetrics bool - MetricsListenAddress string - RelayServerAddress string - ReportsAddressFlag string - OptimizerQoSAddress string + AddApiMethodCallsMetrics bool + MetricsListenAddress string + RelayServerAddress string + ReportsAddressFlag string + OptimizerQoSAddress string + OptimizerQoSAddressRefactor string } type RPCConsumer struct { consumerStateTracker ConsumerStateTrackerInf @@ -136,9 +137,9 @@ func (rpcc *RPCConsumer) Start(ctx context.Context, options *rpcConsumerStartOpt consumerMetricsManager := metrics.NewConsumerMetricsManager(metrics.ConsumerMetricsManagerOptions{NetworkAddress: options.analyticsServerAddresses.MetricsListenAddress, AddMethodsApiGauge: options.analyticsServerAddresses.AddApiMethodCallsMetrics}) // start up prometheus metrics consumerUsageServeManager := metrics.NewConsumerRelayServerClient(options.analyticsServerAddresses.RelayServerAddress) // start up relay server reporting var consumerOptimizerQoSClient *metrics.ConsumerOptimizerQoSClient - if options.analyticsServerAddresses.OptimizerQoSAddress != "" { - consumerOptimizerQoSClient = metrics.NewConsumerOptimizerQoSClient(options.analyticsServerAddresses.OptimizerQoSAddress, metrics.OptimizerQosServerPushInterval) // start up optimizer qos client - consumerOptimizerQoSClient.StartOptimizersQoSReportsCollecting(ctx, metrics.OptimizerQosServerSamplingInterval) // start up optimizer qos client + if options.analyticsServerAddresses.OptimizerQoSAddress != "" || options.analyticsServerAddresses.OptimizerQoSAddressRefactor != "" { + consumerOptimizerQoSClient = metrics.NewConsumerOptimizerQoSClient(options.analyticsServerAddresses.OptimizerQoSAddress, options.analyticsServerAddresses.OptimizerQoSAddressRefactor, metrics.OptimizerQosServerPushInterval) // start up optimizer qos client + consumerOptimizerQoSClient.StartOptimizersQoSReportsCollecting(ctx, metrics.OptimizerQosServerSamplingInterval) // start up optimizer qos client } rpcConsumerMetrics, err := metrics.NewRPCConsumerLogs(consumerMetricsManager, consumerUsageServeManager, consumerOptimizerQoSClient) @@ -183,6 +184,7 @@ func (rpcc *RPCConsumer) Start(ctx context.Context, options *rpcConsumerStartOpt } optimizers := &common.SafeSyncMap[string, *provideroptimizer.ProviderOptimizer]{} + optimizersRefactored := &common.SafeSyncMap[string, *provideroptimizer.ProviderOptimizer_Refactor]{} consumerConsistencies := &common.SafeSyncMap[string, *ConsumerConsistency]{} finalizationConsensuses := &common.SafeSyncMap[string, *finalizationconsensus.FinalizationConsensus]{} @@ -238,6 +240,7 @@ func (rpcc *RPCConsumer) Start(ctx context.Context, options *rpcConsumerStartOpt _, averageBlockTime, _, _ := chainParser.ChainBlockStats() var optimizer *provideroptimizer.ProviderOptimizer + var optimizerRefactored *provideroptimizer.ProviderOptimizer_Refactor var consumerConsistency *ConsumerConsistency var finalizationConsensus *finalizationconsensus.FinalizationConsensus getOrCreateChainAssets := func() error { @@ -245,6 +248,7 @@ func (rpcc *RPCConsumer) Start(ctx context.Context, options *rpcConsumerStartOpt chainMutexes[chainID].Lock() defer chainMutexes[chainID].Unlock() var loaded bool + var loadedRefactored bool var err error baseLatency := common.AverageWorldLatency / 2 // we want performance to be half our timeout or better @@ -256,10 +260,20 @@ func (rpcc *RPCConsumer) Start(ctx context.Context, options *rpcConsumerStartOpt return utils.LavaFormatError("failed loading optimizer", err, utils.LogAttr("endpoint", rpcEndpoint.Key())) } + // Create / Use existing optimizer refactored + newOptimizerRefactored := provideroptimizer.NewProviderOptimizer_Refactor(options.strategy, averageBlockTime, options.maxConcurrentProviders) + optimizerRefactored, loadedRefactored, err = optimizersRefactored.LoadOrStore(chainID, newOptimizerRefactored) + if err != nil { + return utils.LavaFormatError("failed loading optimizer", err, utils.LogAttr("endpoint", rpcEndpoint.Key())) + } + if !loaded { // if this is a new optimizer, register it in the consumerOptimizerQoSClient consumerOptimizerQoSClient.RegisterOptimizer(optimizer, chainID) } + if !loadedRefactored { + consumerOptimizerQoSClient.RegisterOptimizerRefactored(optimizerRefactored, chainID) + } // Create / Use existing ConsumerConsistency newConsumerConsistency := NewConsumerConsistency(chainID) @@ -552,11 +566,12 @@ rpcconsumer consumer_examples/full_consumer_example.yml --cache-be "127.0.0.1:77 } analyticsServerAddresses := AnalyticsServerAddresses{ - AddApiMethodCallsMetrics: viper.GetBool(metrics.AddApiMethodCallsMetrics), - MetricsListenAddress: viper.GetString(metrics.MetricsListenFlagName), - RelayServerAddress: viper.GetString(metrics.RelayServerFlagName), - ReportsAddressFlag: viper.GetString(reportsSendBEAddress), - OptimizerQoSAddress: viper.GetString(common.OptimizerQosServerAddressFlag), + AddApiMethodCallsMetrics: viper.GetBool(metrics.AddApiMethodCallsMetrics), + MetricsListenAddress: viper.GetString(metrics.MetricsListenFlagName), + RelayServerAddress: viper.GetString(metrics.RelayServerFlagName), + ReportsAddressFlag: viper.GetString(reportsSendBEAddress), + OptimizerQoSAddress: viper.GetString(common.OptimizerQosServerAddressFlag), + OptimizerQoSAddressRefactor: viper.GetString(common.OptimizerQosServerAddressRefactorFlag), } var refererData *chainlib.RefererData @@ -646,6 +661,7 @@ rpcconsumer consumer_examples/full_consumer_example.yml --cache-be "127.0.0.1:77 cmdRPCConsumer.Flags().IntVar(&provideroptimizer.OptimizerNumTiers, common.SetProviderOptimizerNumberOfTiersToCreate, 4, "set the number of groups to create, default is 4") // optimizer qos reports cmdRPCConsumer.Flags().String(common.OptimizerQosServerAddressFlag, "", "address to send optimizer qos reports to") + cmdRPCConsumer.Flags().String(common.OptimizerQosServerAddressRefactorFlag, "", "address to send optimizer qos reports to the refactored server") cmdRPCConsumer.Flags().DurationVar(&metrics.OptimizerQosServerPushInterval, common.OptimizerQosServerPushIntervalFlag, time.Minute*5, "interval to push optimizer qos reports") cmdRPCConsumer.Flags().DurationVar(&metrics.OptimizerQosServerSamplingInterval, common.OptimizerQosServerSamplingIntervalFlag, time.Second*1, "interval to sample optimizer qos reports") cmdRPCConsumer.Flags().IntVar(&chainlib.WebSocketRateLimit, common.RateLimitWebSocketFlag, chainlib.WebSocketRateLimit, "rate limit (per second) websocket requests per user connection, default is unlimited") From 3e7dd86338543224ee2ea358c692d20c094d0101 Mon Sep 17 00:00:00 2001 From: Oren Date: Tue, 26 Nov 2024 17:12:11 +0200 Subject: [PATCH 4/8] fix test --- protocol/provideroptimizer/provider_optimizer_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/protocol/provideroptimizer/provider_optimizer_test.go b/protocol/provideroptimizer/provider_optimizer_test.go index bfba104fad..ded193c979 100644 --- a/protocol/provideroptimizer/provider_optimizer_test.go +++ b/protocol/provideroptimizer/provider_optimizer_test.go @@ -827,7 +827,7 @@ func TestProviderOptimizerWithOptimizerQoSClient(t *testing.T) { chainId := "dontcare" - consumerOptimizerQoSClient := metrics.NewConsumerOptimizerQoSClient(mockHttpServer.URL, 1*time.Second) + consumerOptimizerQoSClient := metrics.NewConsumerOptimizerQoSClient(mockHttpServer.URL, "", 1*time.Second) consumerOptimizerQoSClient.StartOptimizersQoSReportsCollecting(context.Background(), 900*time.Millisecond) providerOptimizer := NewProviderOptimizer(STRATEGY_BALANCED, TEST_AVERAGE_BLOCK_TIME, TEST_BASE_WORLD_LATENCY, 10, consumerOptimizerQoSClient, chainId) From 17166c67c4d3fb16d4f4d81e5084e99b06a9c130 Mon Sep 17 00:00:00 2001 From: Oren Date: Sun, 1 Dec 2024 10:10:38 +0200 Subject: [PATCH 5/8] add tier chances to report --- .../metrics/consumer_optimizer_qos_client.go | 26 ++++++++++--------- .../provideroptimizer/provider_optimizer.go | 2 ++ .../provider_optimizer_refactor.go | 2 ++ 3 files changed, 18 insertions(+), 12 deletions(-) diff --git a/protocol/metrics/consumer_optimizer_qos_client.go b/protocol/metrics/consumer_optimizer_qos_client.go index 10f84391b0..895745baea 100644 --- a/protocol/metrics/consumer_optimizer_qos_client.go +++ b/protocol/metrics/consumer_optimizer_qos_client.go @@ -41,21 +41,23 @@ type OptimizerQoSReport struct { LatencyScore float64 GenericScore float64 EntryIndex int + TierChances map[int]float64 } type optimizerQoSReportToSend struct { - Timestamp time.Time `json:"timestamp"` - SyncScore float64 `json:"sync_score"` - AvailabilityScore float64 `json:"availability_score"` - LatencyScore float64 `json:"latency_score"` - GenericScore float64 `json:"generic_score"` - ProviderAddress string `json:"provider"` - ConsumerOrigin string `json:"consumer"` - ChainId string `json:"chain_id"` - NodeErrorRate float64 `json:"node_error_rate"` - Epoch uint64 `json:"epoch"` - ProviderStake int64 `json:"provider_stake"` - EntryIndex int `json:"entry_index"` + Timestamp time.Time `json:"timestamp"` + SyncScore float64 `json:"sync_score"` + AvailabilityScore float64 `json:"availability_score"` + LatencyScore float64 `json:"latency_score"` + GenericScore float64 `json:"generic_score"` + ProviderAddress string `json:"provider"` + ConsumerOrigin string `json:"consumer"` + ChainId string `json:"chain_id"` + NodeErrorRate float64 `json:"node_error_rate"` + Epoch uint64 `json:"epoch"` + ProviderStake int64 `json:"provider_stake"` + EntryIndex int `json:"entry_index"` + TierChances map[int]float64 `json:"tier_chances"` } func (oqosr optimizerQoSReportToSend) String() string { diff --git a/protocol/provideroptimizer/provider_optimizer.go b/protocol/provideroptimizer/provider_optimizer.go index bc216cfd38..672c62f293 100644 --- a/protocol/provideroptimizer/provider_optimizer.go +++ b/protocol/provideroptimizer/provider_optimizer.go @@ -202,9 +202,11 @@ func (po *ProviderOptimizer) CalculateQoSScoresForMetrics(allAddresses []string, reports := []*metrics.OptimizerQoSReport{} rawScores := selectionTier.GetRawScores() + tierChances := selectionTier.ShiftTierChance(po.OptimizerNumTiers, map[int]float64{0: ATierChance}) for idx, entry := range rawScores { qosReport := providersScores[entry.Address] qosReport.EntryIndex = idx + qosReport.TierChances = tierChances reports = append(reports, qosReport) } diff --git a/protocol/provideroptimizer/provider_optimizer_refactor.go b/protocol/provideroptimizer/provider_optimizer_refactor.go index 48f25e8295..db666fc9df 100644 --- a/protocol/provideroptimizer/provider_optimizer_refactor.go +++ b/protocol/provideroptimizer/provider_optimizer_refactor.go @@ -168,10 +168,12 @@ func (po *ProviderOptimizer_Refactor) CalculateQoSScoresForMetrics(allAddresses selectionTier, _, providersScores := po.CalculateSelectionTiers_Refactor(allAddresses, ignoredProviders, cu, requestedBlock) reports := []*metrics.OptimizerQoSReport{} + tierChances := selectionTier.ShiftTierChance(po.OptimizerNumTiers, map[int]float64{0: ATierChance_Refactor}) rawScores := selectionTier.GetRawScores() for idx, entry := range rawScores { qosReport := providersScores[entry.Address] qosReport.EntryIndex = idx + qosReport.TierChances = tierChances reports = append(reports, qosReport) } From 9d33edd2085687ff6f03934f7502049d101d254e Mon Sep 17 00:00:00 2001 From: Oren Date: Mon, 2 Dec 2024 17:02:15 +0200 Subject: [PATCH 6/8] fix tier prints --- .../metrics/consumer_optimizer_qos_client.go | 30 ++++++++++--------- .../provideroptimizer/provider_optimizer.go | 2 +- .../provider_optimizer_refactor.go | 10 ++++++- 3 files changed, 26 insertions(+), 16 deletions(-) diff --git a/protocol/metrics/consumer_optimizer_qos_client.go b/protocol/metrics/consumer_optimizer_qos_client.go index 895745baea..985e6895ca 100644 --- a/protocol/metrics/consumer_optimizer_qos_client.go +++ b/protocol/metrics/consumer_optimizer_qos_client.go @@ -41,23 +41,23 @@ type OptimizerQoSReport struct { LatencyScore float64 GenericScore float64 EntryIndex int - TierChances map[int]float64 + TierChances string } type optimizerQoSReportToSend struct { - Timestamp time.Time `json:"timestamp"` - SyncScore float64 `json:"sync_score"` - AvailabilityScore float64 `json:"availability_score"` - LatencyScore float64 `json:"latency_score"` - GenericScore float64 `json:"generic_score"` - ProviderAddress string `json:"provider"` - ConsumerOrigin string `json:"consumer"` - ChainId string `json:"chain_id"` - NodeErrorRate float64 `json:"node_error_rate"` - Epoch uint64 `json:"epoch"` - ProviderStake int64 `json:"provider_stake"` - EntryIndex int `json:"entry_index"` - TierChances map[int]float64 `json:"tier_chances"` + Timestamp time.Time `json:"timestamp"` + SyncScore float64 `json:"sync_score"` + AvailabilityScore float64 `json:"availability_score"` + LatencyScore float64 `json:"latency_score"` + GenericScore float64 `json:"generic_score"` + ProviderAddress string `json:"provider"` + ConsumerOrigin string `json:"consumer"` + ChainId string `json:"chain_id"` + NodeErrorRate float64 `json:"node_error_rate"` + Epoch uint64 `json:"epoch"` + ProviderStake int64 `json:"provider_stake"` + EntryIndex int `json:"entry_index"` + TierChances string `json:"tier_chances"` } func (oqosr optimizerQoSReportToSend) String() string { @@ -148,6 +148,7 @@ func (coqc *ConsumerOptimizerQoSClient) appendOptimizerQoSReport(report *Optimiz Epoch: epoch, NodeErrorRate: coqc.calculateNodeErrorRate(chainId, report.ProviderAddress), ProviderStake: coqc.getProviderChainStake(chainId, report.ProviderAddress, epoch), + TierChances: report.TierChances, } coqc.queueSender.appendQueue(optimizerQoSReportToSend) @@ -169,6 +170,7 @@ func (coqc *ConsumerOptimizerQoSClient) appendOptimizerQoSReportRefactored(repor Epoch: epoch, NodeErrorRate: coqc.calculateNodeErrorRate(chainId, report.ProviderAddress), ProviderStake: coqc.getProviderChainStake(chainId, report.ProviderAddress, epoch), + TierChances: report.TierChances, } coqc.queueSenderRefactored.appendQueue(optimizerQoSReportToSend) diff --git a/protocol/provideroptimizer/provider_optimizer.go b/protocol/provideroptimizer/provider_optimizer.go index 672c62f293..8898f167a6 100644 --- a/protocol/provideroptimizer/provider_optimizer.go +++ b/protocol/provideroptimizer/provider_optimizer.go @@ -206,7 +206,7 @@ func (po *ProviderOptimizer) CalculateQoSScoresForMetrics(allAddresses []string, for idx, entry := range rawScores { qosReport := providersScores[entry.Address] qosReport.EntryIndex = idx - qosReport.TierChances = tierChances + qosReport.TierChances = PrintTierChances(tierChances) reports = append(reports, qosReport) } diff --git a/protocol/provideroptimizer/provider_optimizer_refactor.go b/protocol/provideroptimizer/provider_optimizer_refactor.go index db666fc9df..baf0d2be50 100644 --- a/protocol/provideroptimizer/provider_optimizer_refactor.go +++ b/protocol/provideroptimizer/provider_optimizer_refactor.go @@ -173,13 +173,21 @@ func (po *ProviderOptimizer_Refactor) CalculateQoSScoresForMetrics(allAddresses for idx, entry := range rawScores { qosReport := providersScores[entry.Address] qosReport.EntryIndex = idx - qosReport.TierChances = tierChances + qosReport.TierChances = PrintTierChances(tierChances) reports = append(reports, qosReport) } return reports } +func PrintTierChances(tierChances map[int]float64) string { + var tierChancesString string + for tier, chance := range tierChances { + tierChancesString += fmt.Sprintf("%d: %f, ", tier, chance) + } + return tierChancesString +} + func (po *ProviderOptimizer_Refactor) CalculateSelectionTiers_Refactor(allAddresses []string, ignoredProviders map[string]struct{}, cu uint64, requestedBlock int64) (SelectionTier, Exploration_Refactor, map[string]*metrics.OptimizerQoSReport) { explorationCandidate := Exploration_Refactor{address: "", time: time.Now().Add(time.Hour)} selectionTier := NewSelectionTier() From 0378ad1605de97b33d1448d41e2f760c88ff6924 Mon Sep 17 00:00:00 2001 From: Oren Date: Mon, 2 Dec 2024 17:03:24 +0200 Subject: [PATCH 7/8] updated http server and added plot data scripts --- scripts/test/httpServer.py | 46 +++++++++++++++-------- scripts/test/plot_data.py | 77 ++++++++++++++++++++++++++++++++++++++ 2 files changed, 108 insertions(+), 15 deletions(-) create mode 100644 scripts/test/plot_data.py diff --git a/scripts/test/httpServer.py b/scripts/test/httpServer.py index 94358ac9d6..b7adea787d 100644 --- a/scripts/test/httpServer.py +++ b/scripts/test/httpServer.py @@ -1,12 +1,35 @@ +import json +import csv from http.server import BaseHTTPRequestHandler, HTTPServer import sys class RequestHandler(BaseHTTPRequestHandler): + def __init__(self, *args, **kwargs): + self.csv_file_name = kwargs.pop('csv_file_name', 'data.csv') + super().__init__(*args, **kwargs) + def do_GET(self): self.print_request() def do_POST(self): - self.print_request() + content_length = int(self.headers.get("Content-Length", 0)) + if content_length > 0: + body = self.rfile.read(content_length) + data = json.loads(body.decode('utf-8')) + self.write_to_csv(data) + self.send_response(200) + self.send_header("Content-type", "text/html") + self.end_headers() + self.wfile.write(b"OK") + + def write_to_csv(self, data): + with open(self.csv_file_name, 'a', newline='') as csvfile: + fieldnames = data[0].keys() + writer = csv.DictWriter(csvfile, fieldnames=fieldnames) + if csvfile.tell() == 0: # Check if file is empty to write header + writer.writeheader() + for entry in data: + writer.writerow(entry) def print_request(self): # Print request line @@ -25,21 +48,14 @@ def print_request(self): body = self.rfile.read(content_length) print(f"Body:\n{body.decode('utf-8')}") - # Send a response back to the client - self.send_response(200) - self.send_header("Content-type", "text/html") - self.end_headers() - self.wfile.write(b"OK") - -def run_server(port=8000): +def run_server(port=8000, csv_file_name='data.csv'): server_address = ('', port) - httpd = HTTPServer(server_address, RequestHandler) - print(f"Server running on port {port}") + handler = lambda *args, **kwargs: RequestHandler(*args, csv_file_name=csv_file_name, **kwargs) + httpd = HTTPServer(server_address, handler) + print(f"Server running on port {port}, writing to {csv_file_name}") httpd.serve_forever() if __name__ == '__main__': - if len(sys.argv) > 1: - port = int(sys.argv[1]) - run_server(port) - else: - run_server() \ No newline at end of file + port = int(sys.argv[1]) if len(sys.argv) > 1 else 8000 + csv_file_name = sys.argv[2] if len(sys.argv) > 2 else 'data.csv' + run_server(port, csv_file_name) \ No newline at end of file diff --git a/scripts/test/plot_data.py b/scripts/test/plot_data.py new file mode 100644 index 0000000000..ecc23f6c32 --- /dev/null +++ b/scripts/test/plot_data.py @@ -0,0 +1,77 @@ +import csv +import matplotlib.pyplot as plt +from dateutil import parser +import sys +import random +from collections import defaultdict + +def read_csv(file_path): + data = [] + with open(file_path, 'r') as csvfile: + reader = csv.DictReader(csvfile) + for row in reader: + data.append(row) + return data + +def group_by_provider(data): + grouped_data = defaultdict(list) + for entry in data: + grouped_data[entry['provider']].append(entry) + return grouped_data + +def plot_graphs(data, provider, output_file): + timestamps = [parser.parse(entry['timestamp']) for entry in data] + sync_scores = [float(entry['sync_score']) for entry in data] + availability_scores = [float(entry['availability_score']) for entry in data] + latency_scores = [float(entry['latency_score']) for entry in data] + generic_scores = [float(entry['generic_score']) for entry in data] + node_error_rates = [float(entry['node_error_rate']) for entry in data] + provider_stake = data[0]['provider_stake'] # Assuming stake is constant + + avg_node_error_rate = sum(node_error_rates) / len(node_error_rates) + + plt.figure(figsize=(10, 5)) + plt.plot(timestamps, sync_scores, label='Sync Score') + plt.plot(timestamps, availability_scores, label='Availability Score') + plt.plot(timestamps, latency_scores, label='Latency Score') + plt.plot(timestamps, generic_scores, label='Generic Score') + plt.xlabel('Timestamp') + plt.ylabel('Scores') + plt.title(f'Provider: {provider}, Avg Node Error Rate: {avg_node_error_rate:.2f}, Stake: {provider_stake}') + plt.legend() + plt.xticks(rotation=45) + plt.tight_layout() + plt.savefig(output_file) + print(f"Plot saved as {output_file}") + +if __name__ == '__main__': + if len(sys.argv) < 3: + print("Usage: python plot_data.py ") + sys.exit(1) + + current_csv_file = sys.argv[1] + refactored_csv_file = sys.argv[2] + + current_data = read_csv(current_csv_file) + refactored_data = read_csv(refactored_csv_file) + + current_grouped_data = group_by_provider(current_data) + refactored_grouped_data = group_by_provider(refactored_data) + + # Randomly select one provider from the current mechanism data + selected_provider = random.choice(list(current_grouped_data.keys())) + + # Ensure the selected provider exists in both datasets + if selected_provider not in refactored_grouped_data: + print(f"Provider {selected_provider} not found in refactored data.") + sys.exit(1) + + current_selected_data = current_grouped_data[selected_provider] + refactored_selected_data = refactored_grouped_data[selected_provider] + + # Extract the last 4 characters of the provider address + provider_suffix = selected_provider.split('@')[-1][-4:] + + # Plot graphs for both datasets + plot_graphs(current_selected_data, selected_provider, f"provider_{provider_suffix}_current.png") + plot_graphs(refactored_selected_data, selected_provider, f"provider_{provider_suffix}_refactor.png") \ No newline at end of file From 813e1167946f68e1209e003c7b8c12064d7a4f41 Mon Sep 17 00:00:00 2001 From: Oren Date: Mon, 2 Dec 2024 17:33:35 +0200 Subject: [PATCH 8/8] update tiers plot --- scripts/test/plot_data.py | 35 ++++++++++++++++++++++++++++++++--- 1 file changed, 32 insertions(+), 3 deletions(-) diff --git a/scripts/test/plot_data.py b/scripts/test/plot_data.py index ecc23f6c32..a3ed0607ac 100644 --- a/scripts/test/plot_data.py +++ b/scripts/test/plot_data.py @@ -19,7 +19,7 @@ def group_by_provider(data): grouped_data[entry['provider']].append(entry) return grouped_data -def plot_graphs(data, provider, output_file): +def plot_graphs(data, provider, output_file, is_refactored=False): timestamps = [parser.parse(entry['timestamp']) for entry in data] sync_scores = [float(entry['sync_score']) for entry in data] availability_scores = [float(entry['availability_score']) for entry in data] @@ -37,13 +37,39 @@ def plot_graphs(data, provider, output_file): plt.plot(timestamps, generic_scores, label='Generic Score') plt.xlabel('Timestamp') plt.ylabel('Scores') - plt.title(f'Provider: {provider}, Avg Node Error Rate: {avg_node_error_rate:.2f}, Stake: {provider_stake}') + title_suffix = " (Refactor)" if is_refactored else "" + plt.title(f'Provider: {provider}, Avg Node Error Rate: {avg_node_error_rate:.2f}, Stake: {provider_stake}{title_suffix}') plt.legend() plt.xticks(rotation=45) plt.tight_layout() plt.savefig(output_file) print(f"Plot saved as {output_file}") +def plot_tier_chances_over_time(data, provider, output_file, is_refactored=False): + timestamps = [parser.parse(entry['timestamp']) for entry in data] + tier_chances_dicts = [ + {int(k): float(v) for k, v in (item.split(': ') for item in entry['tier_chances'].split(', ') if item)} + for entry in data + ] + + # Collect all unique tiers + all_tiers = set(tier for tier_chances in tier_chances_dicts for tier in tier_chances.keys()) + + plt.figure(figsize=(10, 5)) + for tier in sorted(all_tiers): + tier_chances_over_time = [tier_chances.get(tier, 0) for tier_chances in tier_chances_dicts] + plt.plot(timestamps, tier_chances_over_time, label=f'Tier {tier}') + + plt.xlabel('Timestamp') + plt.ylabel('Chance') + title_suffix = " (Refactor)" if is_refactored else "" + plt.title(f'Provider: {provider} Tier Chances Over Time{title_suffix}') + plt.legend() + plt.xticks(rotation=45) + plt.tight_layout() + plt.savefig(output_file) + print(f"Tier chances over time plot saved as {output_file}") + if __name__ == '__main__': if len(sys.argv) < 3: print("Usage: python plot_data.py ") @@ -74,4 +100,7 @@ def plot_graphs(data, provider, output_file): # Plot graphs for both datasets plot_graphs(current_selected_data, selected_provider, f"provider_{provider_suffix}_current.png") - plot_graphs(refactored_selected_data, selected_provider, f"provider_{provider_suffix}_refactor.png") \ No newline at end of file + plot_tier_chances_over_time(current_selected_data, selected_provider, f"provider_{provider_suffix}_current_tier_chances.png") + + plot_graphs(refactored_selected_data, selected_provider, f"provider_{provider_suffix}_refactor.png", is_refactored=True) + plot_tier_chances_over_time(refactored_selected_data, selected_provider, f"provider_{provider_suffix}_refactor_tier_chances.png", is_refactored=True) \ No newline at end of file