Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: Optimizer Refactor Part 2: Add prometheus metrics for refactor optimizer #1802

Open
wants to merge 10 commits into
base: CNS-1008-score-store-refactor
Choose a base branch
from
167 changes: 164 additions & 3 deletions protocol/metrics/consumer_metrics_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"time"

"github.com/lavanet/lava/v4/utils"
scoreutils "github.com/lavanet/lava/v4/utils/score"
pairingtypes "github.com/lavanet/lava/v4/x/pairing/types"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
Expand Down Expand Up @@ -68,6 +69,22 @@ type ConsumerMetricsManager struct {
relayProcessingLatencyAfterProvider *prometheus.GaugeVec
averageProcessingLatency map[string]*LatencyTracker
consumerOptimizerQoSClient *ConsumerOptimizerQoSClient

// optimizer metrics
optimizerProviderScore *prometheus.GaugeVec
optimizerProviderLatency *prometheus.GaugeVec
optimizerProviderSync *prometheus.GaugeVec
optimizerProviderAvailability *prometheus.GaugeVec
optimizerProviderTier *prometheus.GaugeVec
optimizerTierChance *prometheus.GaugeVec

// refactored optimizer metrics
optimizerRefactorProviderScore *prometheus.GaugeVec
optimizerRefactorProviderLatency *prometheus.GaugeVec
optimizerRefactorProviderSync *prometheus.GaugeVec
optimizerRefactorProviderAvailability *prometheus.GaugeVec
optimizerRefactorProviderTier *prometheus.GaugeVec
optimizerRefactorTierChance *prometheus.GaugeVec
}

type ConsumerMetricsManagerOptions struct {
Expand Down Expand Up @@ -226,6 +243,68 @@ func NewConsumerMetricsManager(options ConsumerMetricsManagerOptions) *ConsumerM
Help: "average latency of processing a successful relay after it is received from the provider in µs (10^6)",
}, []string{"spec", "apiInterface"})

optimizerProviderScore := prometheus.NewGaugeVec(prometheus.GaugeOpts{
Name: "lava_consumer_optimizer_provider_score",
Help: "[Optimizer] The total score of a provider",
}, []string{"spec", "api_interface", "provider_address", "epoch"})

optimizerProviderLatency := prometheus.NewGaugeVec(prometheus.GaugeOpts{
Name: "lava_consumer_optimizer_provider_latency",
Help: "[Optimizer] The latency of a provider",
}, []string{"spec", "api_interface", "provider_address", "epoch"})

optimizerProviderSync := prometheus.NewGaugeVec(prometheus.GaugeOpts{
Name: "lava_consumer_optimizer_provider_sync",
Help: "[Optimizer] The sync of a provider",
}, []string{"spec", "api_interface", "provider_address", "epoch"})

optimizerProviderAvailability := prometheus.NewGaugeVec(prometheus.GaugeOpts{
Name: "lava_consumer_optimizer_provider_availability",
Help: "[Optimizer] The availability of a provider",
}, []string{"spec", "api_interface", "provider_address", "epoch"})

optimizerProviderTier := prometheus.NewGaugeVec(prometheus.GaugeOpts{
Name: "lava_consumer_optimizer_provider_tier",
Help: "[Optimizer] The tier of a provider",
}, []string{"spec", "api_interface", "provider_address", "epoch"})

optimizerTierChance := prometheus.NewGaugeVec(prometheus.GaugeOpts{
Name: "lava_consumer_optimizer_provider_tiers_chances",
Help: "[Optimizer] The chances of a tier being selected by the optimizer",
}, []string{"spec", "api_interface", "tier", "epoch"})

//

optimizerRefactorProviderScore := prometheus.NewGaugeVec(prometheus.GaugeOpts{
Name: "lava_consumer_optimizer_provider_score",
Help: "[Optimizer Refactor] The total score of a provider",
}, []string{"spec", "api_interface", "provider_address", "epoch"})

optimizerRefactorProviderLatency := prometheus.NewGaugeVec(prometheus.GaugeOpts{
Name: "lava_consumer_optimizer_provider_latency",
Help: "[Optimizer Refactor] The latency of a provider",
}, []string{"spec", "api_interface", "provider_address", "epoch"})

optimizerRefactorProviderSync := prometheus.NewGaugeVec(prometheus.GaugeOpts{
Name: "lava_consumer_optimizer_provider_sync",
Help: "[Optimizer Refactor] The sync of a provider",
}, []string{"spec", "api_interface", "provider_address", "epoch"})

optimizerRefactorProviderAvailability := prometheus.NewGaugeVec(prometheus.GaugeOpts{
Name: "lava_consumer_optimizer_provider_availability",
Help: "[Optimizer Refactor] The availability of a provider",
}, []string{"spec", "api_interface", "provider_address", "epoch"})

optimizerRefactorProviderTier := prometheus.NewGaugeVec(prometheus.GaugeOpts{
Name: "lava_consumer_optimizer_provider_tier",
Help: "[Optimizer Refactor] The tier of a provider",
}, []string{"spec", "api_interface", "provider_address", "epoch"})

optimizerRefactorTierChance := prometheus.NewGaugeVec(prometheus.GaugeOpts{
Name: "lava_consumer_optimizer_provider_tiers_chances",
Help: "[Optimizer Refactor] The chances of a tier being selected by the optimizer",
}, []string{"spec", "api_interface", "tier", "epoch"})

// Register the metrics with the Prometheus registry.
prometheus.MustRegister(totalCURequestedMetric)
prometheus.MustRegister(totalRelaysRequestedMetric)
Expand Down Expand Up @@ -253,6 +332,18 @@ func NewConsumerMetricsManager(options ConsumerMetricsManagerOptions) *ConsumerM
prometheus.MustRegister(totalFailedWsSubscriptionRequestsMetric)
prometheus.MustRegister(totalDuplicatedWsSubscriptionRequestsMetric)
prometheus.MustRegister(totalWsSubscriptionDissconnectMetric)
prometheus.MustRegister(optimizerProviderScore)
prometheus.MustRegister(optimizerProviderLatency)
prometheus.MustRegister(optimizerProviderSync)
prometheus.MustRegister(optimizerProviderAvailability)
prometheus.MustRegister(optimizerProviderTier)
prometheus.MustRegister(optimizerTierChance)
prometheus.MustRegister(optimizerRefactorProviderScore)
prometheus.MustRegister(optimizerRefactorProviderLatency)
prometheus.MustRegister(optimizerRefactorProviderSync)
prometheus.MustRegister(optimizerRefactorProviderAvailability)
prometheus.MustRegister(optimizerRefactorProviderTier)
prometheus.MustRegister(optimizerRefactorTierChance)
prometheus.MustRegister(totalLoLSuccessMetric)
prometheus.MustRegister(totalLoLErrorsMetric)

Expand Down Expand Up @@ -288,9 +379,23 @@ func NewConsumerMetricsManager(options ConsumerMetricsManagerOptions) *ConsumerM
relayProcessingLatencyBeforeProvider: relayProcessingLatencyBeforeProvider,
relayProcessingLatencyAfterProvider: relayProcessingLatencyAfterProvider,
averageProcessingLatency: map[string]*LatencyTracker{},
totalLoLSuccessMetric: totalLoLSuccessMetric,
totalLoLErrorsMetric: totalLoLErrorsMetric,
consumerOptimizerQoSClient: options.ConsumerOptimizerQoSClient,

optimizerProviderScore: optimizerProviderScore,
optimizerProviderLatency: optimizerProviderLatency,
optimizerProviderSync: optimizerProviderSync,
optimizerProviderAvailability: optimizerProviderAvailability,
optimizerProviderTier: optimizerProviderTier,
optimizerTierChance: optimizerTierChance,

optimizerRefactorProviderScore: optimizerRefactorProviderScore,
optimizerRefactorProviderLatency: optimizerRefactorProviderLatency,
optimizerRefactorProviderSync: optimizerRefactorProviderSync,
optimizerRefactorProviderAvailability: optimizerRefactorProviderAvailability,
optimizerRefactorProviderTier: optimizerRefactorProviderTier,
optimizerRefactorTierChance: optimizerRefactorTierChance,
totalLoLSuccessMetric: totalLoLSuccessMetric,
totalLoLErrorsMetric: totalLoLErrorsMetric,
consumerOptimizerQoSClient: options.ConsumerOptimizerQoSClient,
}

http.Handle("/metrics", promhttp.Handler())
Expand Down Expand Up @@ -581,6 +686,62 @@ func (pme *ConsumerMetricsManager) SetWsSubscriptioDisconnectRequestMetric(chain
pme.totalWsSubscriptionDissconnectMetric.WithLabelValues(chainId, apiInterface, disconnectReason).Inc()
}

func (pme *ConsumerMetricsManager) SetOptimizerProviderScoreMetric(chainId string, apiInterface string, providerAddress string, epoch uint64, scoreType string, refactored bool, score float64) {
if pme == nil {
return
}

switch scoreType {
case scoreutils.LatencyScoreType:
if refactored {
pme.optimizerRefactorProviderLatency.WithLabelValues(chainId, apiInterface, providerAddress, fmt.Sprintf("%d", epoch)).Set(score)
} else {
pme.optimizerProviderLatency.WithLabelValues(chainId, apiInterface, providerAddress, fmt.Sprintf("%d", epoch)).Set(score)
}
case scoreutils.SyncScoreType:
if refactored {
pme.optimizerRefactorProviderSync.WithLabelValues(chainId, apiInterface, providerAddress, fmt.Sprintf("%d", epoch)).Set(score)
} else {
pme.optimizerProviderSync.WithLabelValues(chainId, apiInterface, providerAddress, fmt.Sprintf("%d", epoch)).Set(score)
}
case scoreutils.AvailabilityScoreType:
if refactored {
pme.optimizerRefactorProviderAvailability.WithLabelValues(chainId, apiInterface, providerAddress, fmt.Sprintf("%d", epoch)).Set(score)
} else {
pme.optimizerProviderAvailability.WithLabelValues(chainId, apiInterface, providerAddress, fmt.Sprintf("%d", epoch)).Set(score)
}
case scoreutils.TotalScoreType:
if refactored {
pme.optimizerRefactorProviderScore.WithLabelValues(chainId, apiInterface, providerAddress, fmt.Sprintf("%d", epoch)).Set(score)
} else {
pme.optimizerProviderScore.WithLabelValues(chainId, apiInterface, providerAddress, fmt.Sprintf("%d", epoch)).Set(score)
}
default:
utils.LavaFormatError("Unknown score type", nil, utils.Attribute{Key: "scoreType", Value: scoreType})
}
}

func (pme *ConsumerMetricsManager) SetOptimizerProviderTierMetric(chainId string, apiInterface string, providerAddress string, epoch uint64, refactored bool, tier int) {
if pme == nil {
return
}
if refactored {
pme.optimizerRefactorProviderTier.WithLabelValues(chainId, apiInterface, providerAddress, fmt.Sprintf("%d", epoch)).Set(float64(tier))
} else {
pme.optimizerProviderTier.WithLabelValues(chainId, apiInterface, providerAddress, fmt.Sprintf("%d", epoch)).Set(float64(tier))
}
}

func (pme *ConsumerMetricsManager) SetOptimizerTierChanceMetric(chainId string, apiInterface string, tier int, epoch uint64, refactored bool, chance float64) {
if pme == nil {
return
}
if refactored {
pme.optimizerRefactorTierChance.WithLabelValues(chainId, apiInterface, fmt.Sprintf("%d", tier), fmt.Sprintf("%d", epoch)).Set(chance)
} else {
pme.optimizerTierChance.WithLabelValues(chainId, apiInterface, fmt.Sprintf("%d", tier), fmt.Sprintf("%d", epoch)).Set(chance)
}
}
func (pme *ConsumerMetricsManager) SetLoLResponse(success bool) {
if pme == nil {
return
Expand Down
4 changes: 4 additions & 0 deletions protocol/metrics/consumer_optimizer_qos_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ type OptimizerQoSReport struct {
LatencyScore float64
GenericScore float64
EntryIndex int
TierChances string
}

type OptimizerQoSReportToSend struct {
Expand All @@ -57,6 +58,7 @@ type OptimizerQoSReportToSend struct {
Epoch uint64 `json:"epoch"`
ProviderStake int64 `json:"provider_stake"`
EntryIndex int `json:"entry_index"`
TierChances string `json:"tier_chances"`
}

func (oqosr OptimizerQoSReportToSend) String() string {
Expand Down Expand Up @@ -145,6 +147,7 @@ func (coqc *ConsumerOptimizerQoSClient) appendOptimizerQoSReport(report *Optimiz
Epoch: epoch,
NodeErrorRate: coqc.calculateNodeErrorRate(chainId, report.ProviderAddress),
ProviderStake: coqc.getProviderChainStake(chainId, report.ProviderAddress, epoch),
TierChances: report.TierChances,
}

coqc.queueSender.appendQueue(optimizerQoSReportToSend)
Expand Down Expand Up @@ -172,6 +175,7 @@ func (coqc *ConsumerOptimizerQoSClient) getReportsFromOptimizers() []OptimizerQo
reportsToSend = append(reportsToSend, coqc.appendOptimizerQoSReport(report, chainId, currentEpoch))
}
}

return reportsToSend
}

Expand Down
38 changes: 24 additions & 14 deletions protocol/provideroptimizer/provider_optimizer.go
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,30 @@ func (po *ProviderOptimizer) AppendProbeRelayData(providerAddress string, latenc
)
}

func (po *ProviderOptimizer) CalculateQoSScoresForMetrics(allAddresses []string, ignoredProviders map[string]struct{}, cu uint64, requestedBlock int64) []*metrics.OptimizerQoSReport {
selectionTier, _, providersScores := po.CalculateSelectionTiers(allAddresses, ignoredProviders, cu, requestedBlock)
reports := []*metrics.OptimizerQoSReport{}

rawScores := selectionTier.GetRawScores()
tierChances := selectionTier.ShiftTierChance(po.OptimizerNumTiers, map[int]float64{0: ATierChance})
for idx, entry := range rawScores {
qosReport := providersScores[entry.Address]
qosReport.EntryIndex = idx
qosReport.TierChances = PrintTierChances(tierChances)
reports = append(reports, qosReport)
}

return reports
}

func PrintTierChances(tierChances map[int]float64) string {
var tierChancesString string
for tier, chance := range tierChances {
tierChancesString += fmt.Sprintf("%d: %f, ", tier, chance)
}
return tierChancesString
}

func (po *ProviderOptimizer) CalculateSelectionTiers(allAddresses []string, ignoredProviders map[string]struct{}, cu uint64, requestedBlock int64) (SelectionTier, Exploration, map[string]*metrics.OptimizerQoSReport) {
explorationCandidate := Exploration{address: "", time: time.Now().Add(time.Hour)}
selectionTier := NewSelectionTier()
Expand Down Expand Up @@ -609,17 +633,3 @@ func (po *ProviderOptimizer) GetExcellenceQoSReportForProvider(providerAddress s

return report, providerData.Latency.GetLastUpdateTime()
}

func (po *ProviderOptimizer) CalculateQoSScoresForMetrics(allAddresses []string, ignoredProviders map[string]struct{}, cu uint64, requestedBlock int64) []*metrics.OptimizerQoSReport {
selectionTier, _, providersScores := po.CalculateSelectionTiers(allAddresses, ignoredProviders, cu, requestedBlock)
reports := []*metrics.OptimizerQoSReport{}

rawScores := selectionTier.GetRawScores()
for idx, entry := range rawScores {
qosReport := providersScores[entry.Address]
qosReport.EntryIndex = idx
reports = append(reports, qosReport)
}

return reports
}
96 changes: 95 additions & 1 deletion protocol/rpcconsumer/rpcconsumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,13 @@ func (rpcc *RPCConsumer) Start(ctx context.Context, options *rpcConsumerStartOpt
for _, rpcEndpoint := range options.rpcEndpoints {
go func(rpcEndpoint *lavasession.RPCEndpoint) error {
defer wg.Done()
chainID := rpcEndpoint.ChainID
chainParser, err := chainlib.NewChainParser(rpcEndpoint.ApiInterface)
if err != nil {
err = utils.LavaFormatError("failed creating chain parser", err, utils.Attribute{Key: "endpoint", Value: rpcEndpoint})
errCh <- err
return err
}
rpcConsumerServer, err := rpcc.CreateConsumerEndpoint(ctx, rpcEndpoint, errCh, consumerAddr, consumerStateTracker,
policyUpdaters, optimizers, consumerConsistencies, finalizationConsensuses, chainMutexes,
options, privKey, lavaChainID, rpcConsumerMetrics, consumerReportsManager, consumerOptimizerQoSClient,
Expand All @@ -262,7 +269,94 @@ func (rpcc *RPCConsumer) Start(ctx context.Context, options *rpcConsumerStartOpt
}()
}
}
return err

err = statetracker.RegisterForSpecUpdatesOrSetStaticSpec(ctx, chainParser, options.cmdFlags.StaticSpecPath, *rpcEndpoint, rpcc.consumerStateTracker)
if err != nil {
err = utils.LavaFormatError("failed registering for spec updates", err, utils.Attribute{Key: "endpoint", Value: rpcEndpoint})
errCh <- err
return err
}

_, averageBlockTime, _, _ := chainParser.ChainBlockStats()
var optimizer *provideroptimizer.ProviderOptimizer
var consumerConsistency *ConsumerConsistency
var finalizationConsensus *finalizationconsensus.FinalizationConsensus
getOrCreateChainAssets := func() error {
// this is locked so we don't race optimizers creation
chainMutexes[chainID].Lock()
defer chainMutexes[chainID].Unlock()
var loaded bool
var err error

// Create / Use existing optimizer
newOptimizer := provideroptimizer.NewProviderOptimizer(options.strategy, averageBlockTime, options.maxConcurrentProviders, consumerOptimizerQoSClient, chainID)
optimizer, loaded, err = optimizers.LoadOrStore(chainID, newOptimizer)
if err != nil {
return utils.LavaFormatError("failed loading optimizer", err, utils.LogAttr("endpoint", rpcEndpoint.Key()))
}

if !loaded {
// if this is a new optimizer, register it in the consumerOptimizerQoSClient
consumerOptimizerQoSClient.RegisterOptimizer(optimizer, chainID)
}

// Create / Use existing ConsumerConsistency
newConsumerConsistency := NewConsumerConsistency(chainID)
consumerConsistency, _, err = consumerConsistencies.LoadOrStore(chainID, newConsumerConsistency)
if err != nil {
return utils.LavaFormatError("failed loading consumer consistency", err, utils.LogAttr("endpoint", rpcEndpoint.Key()))
}

// Create / Use existing FinalizationConsensus
newFinalizationConsensus := finalizationconsensus.NewFinalizationConsensus(rpcEndpoint.ChainID)
finalizationConsensus, loaded, err = finalizationConsensuses.LoadOrStore(chainID, newFinalizationConsensus)
if err != nil {
return utils.LavaFormatError("failed loading finalization consensus", err, utils.LogAttr("endpoint", rpcEndpoint.Key()))
}
if !loaded { // when creating new finalization consensus instance we need to register it to updates
consumerStateTracker.RegisterFinalizationConsensusForUpdates(ctx, finalizationConsensus)
}
return nil
}
err = getOrCreateChainAssets()
if err != nil {
errCh <- err
return err
}

if finalizationConsensus == nil || optimizer == nil || consumerConsistency == nil {
err = utils.LavaFormatError("failed getting assets, found a nil", nil, utils.Attribute{Key: "endpoint", Value: rpcEndpoint.Key()})
errCh <- err
return err
}

// Create active subscription provider storage for each unique chain
activeSubscriptionProvidersStorage := lavasession.NewActiveSubscriptionProvidersStorage()
consumerSessionManager := lavasession.NewConsumerSessionManager(rpcEndpoint, optimizer, consumerMetricsManager, consumerReportsManager, consumerAddr.String(), activeSubscriptionProvidersStorage)
// Register For Updates
rpcc.consumerStateTracker.RegisterConsumerSessionManagerForPairingUpdates(ctx, consumerSessionManager, options.staticProvidersList)

var relaysMonitor *metrics.RelaysMonitor
if options.cmdFlags.RelaysHealthEnableFlag {
relaysMonitor = metrics.NewRelaysMonitor(options.cmdFlags.RelaysHealthIntervalFlag, rpcEndpoint.ChainID, rpcEndpoint.ApiInterface)
relaysMonitorAggregator.RegisterRelaysMonitor(rpcEndpoint.String(), relaysMonitor)
}

var consumerWsSubscriptionManager *chainlib.ConsumerWSSubscriptionManager
var specMethodType string
if rpcEndpoint.ApiInterface == spectypes.APIInterfaceJsonRPC {
specMethodType = http.MethodPost
}
consumerWsSubscriptionManager = chainlib.NewConsumerWSSubscriptionManager(consumerSessionManager, rpcConsumerServer, options.refererData, specMethodType, chainParser, activeSubscriptionProvidersStorage, consumerMetricsManager)

utils.LavaFormatInfo("RPCConsumer Listening", utils.Attribute{Key: "endpoints", Value: rpcEndpoint.String()})
err = rpcConsumerServer.ServeRPCRequests(ctx, rpcEndpoint, rpcc.consumerStateTracker, chainParser, finalizationConsensus, consumerSessionManager, options.requiredResponses, privKey, lavaChainID, options.cache, rpcConsumerMetrics, consumerAddr, consumerConsistency, relaysMonitor, options.cmdFlags, options.stateShare, options.refererData, consumerReportsManager, consumerWsSubscriptionManager)
if err != nil {
err = utils.LavaFormatError("failed serving rpc requests", err, utils.Attribute{Key: "endpoint", Value: rpcEndpoint})
errCh <- err
return err
}
return nil
}(rpcEndpoint)
}

Expand Down
Loading
Loading