diff --git a/protocol/metrics/consumer_metrics_manager.go b/protocol/metrics/consumer_metrics_manager.go index ae4ee74319..d24575affc 100644 --- a/protocol/metrics/consumer_metrics_manager.go +++ b/protocol/metrics/consumer_metrics_manager.go @@ -9,6 +9,7 @@ import ( "time" "github.com/lavanet/lava/v4/utils" + scoreutils "github.com/lavanet/lava/v4/utils/score" pairingtypes "github.com/lavanet/lava/v4/x/pairing/types" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promhttp" @@ -68,6 +69,22 @@ type ConsumerMetricsManager struct { relayProcessingLatencyAfterProvider *prometheus.GaugeVec averageProcessingLatency map[string]*LatencyTracker consumerOptimizerQoSClient *ConsumerOptimizerQoSClient + + // optimizer metrics + optimizerProviderScore *prometheus.GaugeVec + optimizerProviderLatency *prometheus.GaugeVec + optimizerProviderSync *prometheus.GaugeVec + optimizerProviderAvailability *prometheus.GaugeVec + optimizerProviderTier *prometheus.GaugeVec + optimizerTierChance *prometheus.GaugeVec + + // refactored optimizer metrics + optimizerRefactorProviderScore *prometheus.GaugeVec + optimizerRefactorProviderLatency *prometheus.GaugeVec + optimizerRefactorProviderSync *prometheus.GaugeVec + optimizerRefactorProviderAvailability *prometheus.GaugeVec + optimizerRefactorProviderTier *prometheus.GaugeVec + optimizerRefactorTierChance *prometheus.GaugeVec } type ConsumerMetricsManagerOptions struct { @@ -226,6 +243,68 @@ func NewConsumerMetricsManager(options ConsumerMetricsManagerOptions) *ConsumerM Help: "average latency of processing a successful relay after it is received from the provider in µs (10^6)", }, []string{"spec", "apiInterface"}) + optimizerProviderScore := prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Name: "lava_consumer_optimizer_provider_score", + Help: "[Optimizer] The total score of a provider", + }, []string{"spec", "api_interface", "provider_address", "epoch"}) + + optimizerProviderLatency := prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Name: "lava_consumer_optimizer_provider_latency", + Help: "[Optimizer] The latency of a provider", + }, []string{"spec", "api_interface", "provider_address", "epoch"}) + + optimizerProviderSync := prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Name: "lava_consumer_optimizer_provider_sync", + Help: "[Optimizer] The sync of a provider", + }, []string{"spec", "api_interface", "provider_address", "epoch"}) + + optimizerProviderAvailability := prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Name: "lava_consumer_optimizer_provider_availability", + Help: "[Optimizer] The availability of a provider", + }, []string{"spec", "api_interface", "provider_address", "epoch"}) + + optimizerProviderTier := prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Name: "lava_consumer_optimizer_provider_tier", + Help: "[Optimizer] The tier of a provider", + }, []string{"spec", "api_interface", "provider_address", "epoch"}) + + optimizerTierChance := prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Name: "lava_consumer_optimizer_provider_tiers_chances", + Help: "[Optimizer] The chances of a tier being selected by the optimizer", + }, []string{"spec", "api_interface", "tier", "epoch"}) + + // + + optimizerRefactorProviderScore := prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Name: "lava_consumer_optimizer_provider_score", + Help: "[Optimizer Refactor] The total score of a provider", + }, []string{"spec", "api_interface", "provider_address", "epoch"}) + + optimizerRefactorProviderLatency := prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Name: "lava_consumer_optimizer_provider_latency", + Help: "[Optimizer Refactor] The latency of a provider", + }, []string{"spec", "api_interface", "provider_address", "epoch"}) + + optimizerRefactorProviderSync := prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Name: "lava_consumer_optimizer_provider_sync", + Help: "[Optimizer Refactor] The sync of a provider", + }, []string{"spec", "api_interface", "provider_address", "epoch"}) + + optimizerRefactorProviderAvailability := prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Name: "lava_consumer_optimizer_provider_availability", + Help: "[Optimizer Refactor] The availability of a provider", + }, []string{"spec", "api_interface", "provider_address", "epoch"}) + + optimizerRefactorProviderTier := prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Name: "lava_consumer_optimizer_provider_tier", + Help: "[Optimizer Refactor] The tier of a provider", + }, []string{"spec", "api_interface", "provider_address", "epoch"}) + + optimizerRefactorTierChance := prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Name: "lava_consumer_optimizer_provider_tiers_chances", + Help: "[Optimizer Refactor] The chances of a tier being selected by the optimizer", + }, []string{"spec", "api_interface", "tier", "epoch"}) + // Register the metrics with the Prometheus registry. prometheus.MustRegister(totalCURequestedMetric) prometheus.MustRegister(totalRelaysRequestedMetric) @@ -253,6 +332,18 @@ func NewConsumerMetricsManager(options ConsumerMetricsManagerOptions) *ConsumerM prometheus.MustRegister(totalFailedWsSubscriptionRequestsMetric) prometheus.MustRegister(totalDuplicatedWsSubscriptionRequestsMetric) prometheus.MustRegister(totalWsSubscriptionDissconnectMetric) + prometheus.MustRegister(optimizerProviderScore) + prometheus.MustRegister(optimizerProviderLatency) + prometheus.MustRegister(optimizerProviderSync) + prometheus.MustRegister(optimizerProviderAvailability) + prometheus.MustRegister(optimizerProviderTier) + prometheus.MustRegister(optimizerTierChance) + prometheus.MustRegister(optimizerRefactorProviderScore) + prometheus.MustRegister(optimizerRefactorProviderLatency) + prometheus.MustRegister(optimizerRefactorProviderSync) + prometheus.MustRegister(optimizerRefactorProviderAvailability) + prometheus.MustRegister(optimizerRefactorProviderTier) + prometheus.MustRegister(optimizerRefactorTierChance) prometheus.MustRegister(totalLoLSuccessMetric) prometheus.MustRegister(totalLoLErrorsMetric) @@ -288,9 +379,23 @@ func NewConsumerMetricsManager(options ConsumerMetricsManagerOptions) *ConsumerM relayProcessingLatencyBeforeProvider: relayProcessingLatencyBeforeProvider, relayProcessingLatencyAfterProvider: relayProcessingLatencyAfterProvider, averageProcessingLatency: map[string]*LatencyTracker{}, - totalLoLSuccessMetric: totalLoLSuccessMetric, - totalLoLErrorsMetric: totalLoLErrorsMetric, - consumerOptimizerQoSClient: options.ConsumerOptimizerQoSClient, + + optimizerProviderScore: optimizerProviderScore, + optimizerProviderLatency: optimizerProviderLatency, + optimizerProviderSync: optimizerProviderSync, + optimizerProviderAvailability: optimizerProviderAvailability, + optimizerProviderTier: optimizerProviderTier, + optimizerTierChance: optimizerTierChance, + + optimizerRefactorProviderScore: optimizerRefactorProviderScore, + optimizerRefactorProviderLatency: optimizerRefactorProviderLatency, + optimizerRefactorProviderSync: optimizerRefactorProviderSync, + optimizerRefactorProviderAvailability: optimizerRefactorProviderAvailability, + optimizerRefactorProviderTier: optimizerRefactorProviderTier, + optimizerRefactorTierChance: optimizerRefactorTierChance, + totalLoLSuccessMetric: totalLoLSuccessMetric, + totalLoLErrorsMetric: totalLoLErrorsMetric, + consumerOptimizerQoSClient: options.ConsumerOptimizerQoSClient, } http.Handle("/metrics", promhttp.Handler()) @@ -581,6 +686,62 @@ func (pme *ConsumerMetricsManager) SetWsSubscriptioDisconnectRequestMetric(chain pme.totalWsSubscriptionDissconnectMetric.WithLabelValues(chainId, apiInterface, disconnectReason).Inc() } +func (pme *ConsumerMetricsManager) SetOptimizerProviderScoreMetric(chainId string, apiInterface string, providerAddress string, epoch uint64, scoreType string, refactored bool, score float64) { + if pme == nil { + return + } + + switch scoreType { + case scoreutils.LatencyScoreType: + if refactored { + pme.optimizerRefactorProviderLatency.WithLabelValues(chainId, apiInterface, providerAddress, fmt.Sprintf("%d", epoch)).Set(score) + } else { + pme.optimizerProviderLatency.WithLabelValues(chainId, apiInterface, providerAddress, fmt.Sprintf("%d", epoch)).Set(score) + } + case scoreutils.SyncScoreType: + if refactored { + pme.optimizerRefactorProviderSync.WithLabelValues(chainId, apiInterface, providerAddress, fmt.Sprintf("%d", epoch)).Set(score) + } else { + pme.optimizerProviderSync.WithLabelValues(chainId, apiInterface, providerAddress, fmt.Sprintf("%d", epoch)).Set(score) + } + case scoreutils.AvailabilityScoreType: + if refactored { + pme.optimizerRefactorProviderAvailability.WithLabelValues(chainId, apiInterface, providerAddress, fmt.Sprintf("%d", epoch)).Set(score) + } else { + pme.optimizerProviderAvailability.WithLabelValues(chainId, apiInterface, providerAddress, fmt.Sprintf("%d", epoch)).Set(score) + } + case scoreutils.TotalScoreType: + if refactored { + pme.optimizerRefactorProviderScore.WithLabelValues(chainId, apiInterface, providerAddress, fmt.Sprintf("%d", epoch)).Set(score) + } else { + pme.optimizerProviderScore.WithLabelValues(chainId, apiInterface, providerAddress, fmt.Sprintf("%d", epoch)).Set(score) + } + default: + utils.LavaFormatError("Unknown score type", nil, utils.Attribute{Key: "scoreType", Value: scoreType}) + } +} + +func (pme *ConsumerMetricsManager) SetOptimizerProviderTierMetric(chainId string, apiInterface string, providerAddress string, epoch uint64, refactored bool, tier int) { + if pme == nil { + return + } + if refactored { + pme.optimizerRefactorProviderTier.WithLabelValues(chainId, apiInterface, providerAddress, fmt.Sprintf("%d", epoch)).Set(float64(tier)) + } else { + pme.optimizerProviderTier.WithLabelValues(chainId, apiInterface, providerAddress, fmt.Sprintf("%d", epoch)).Set(float64(tier)) + } +} + +func (pme *ConsumerMetricsManager) SetOptimizerTierChanceMetric(chainId string, apiInterface string, tier int, epoch uint64, refactored bool, chance float64) { + if pme == nil { + return + } + if refactored { + pme.optimizerRefactorTierChance.WithLabelValues(chainId, apiInterface, fmt.Sprintf("%d", tier), fmt.Sprintf("%d", epoch)).Set(chance) + } else { + pme.optimizerTierChance.WithLabelValues(chainId, apiInterface, fmt.Sprintf("%d", tier), fmt.Sprintf("%d", epoch)).Set(chance) + } +} func (pme *ConsumerMetricsManager) SetLoLResponse(success bool) { if pme == nil { return diff --git a/protocol/metrics/consumer_optimizer_qos_client.go b/protocol/metrics/consumer_optimizer_qos_client.go index 72183853d9..3215652177 100644 --- a/protocol/metrics/consumer_optimizer_qos_client.go +++ b/protocol/metrics/consumer_optimizer_qos_client.go @@ -41,6 +41,7 @@ type OptimizerQoSReport struct { LatencyScore float64 GenericScore float64 EntryIndex int + TierChances string } type OptimizerQoSReportToSend struct { @@ -57,6 +58,7 @@ type OptimizerQoSReportToSend struct { Epoch uint64 `json:"epoch"` ProviderStake int64 `json:"provider_stake"` EntryIndex int `json:"entry_index"` + TierChances string `json:"tier_chances"` } func (oqosr OptimizerQoSReportToSend) String() string { @@ -145,6 +147,7 @@ func (coqc *ConsumerOptimizerQoSClient) appendOptimizerQoSReport(report *Optimiz Epoch: epoch, NodeErrorRate: coqc.calculateNodeErrorRate(chainId, report.ProviderAddress), ProviderStake: coqc.getProviderChainStake(chainId, report.ProviderAddress, epoch), + TierChances: report.TierChances, } coqc.queueSender.appendQueue(optimizerQoSReportToSend) @@ -172,6 +175,7 @@ func (coqc *ConsumerOptimizerQoSClient) getReportsFromOptimizers() []OptimizerQo reportsToSend = append(reportsToSend, coqc.appendOptimizerQoSReport(report, chainId, currentEpoch)) } } + return reportsToSend } diff --git a/protocol/provideroptimizer/provider_optimizer.go b/protocol/provideroptimizer/provider_optimizer.go index f2986969c3..f93f9bfc19 100644 --- a/protocol/provideroptimizer/provider_optimizer.go +++ b/protocol/provideroptimizer/provider_optimizer.go @@ -216,6 +216,30 @@ func (po *ProviderOptimizer) AppendProbeRelayData(providerAddress string, latenc ) } +func (po *ProviderOptimizer) CalculateQoSScoresForMetrics(allAddresses []string, ignoredProviders map[string]struct{}, cu uint64, requestedBlock int64) []*metrics.OptimizerQoSReport { + selectionTier, _, providersScores := po.CalculateSelectionTiers(allAddresses, ignoredProviders, cu, requestedBlock) + reports := []*metrics.OptimizerQoSReport{} + + rawScores := selectionTier.GetRawScores() + tierChances := selectionTier.ShiftTierChance(po.OptimizerNumTiers, map[int]float64{0: ATierChance}) + for idx, entry := range rawScores { + qosReport := providersScores[entry.Address] + qosReport.EntryIndex = idx + qosReport.TierChances = PrintTierChances(tierChances) + reports = append(reports, qosReport) + } + + return reports +} + +func PrintTierChances(tierChances map[int]float64) string { + var tierChancesString string + for tier, chance := range tierChances { + tierChancesString += fmt.Sprintf("%d: %f, ", tier, chance) + } + return tierChancesString +} + func (po *ProviderOptimizer) CalculateSelectionTiers(allAddresses []string, ignoredProviders map[string]struct{}, cu uint64, requestedBlock int64) (SelectionTier, Exploration, map[string]*metrics.OptimizerQoSReport) { explorationCandidate := Exploration{address: "", time: time.Now().Add(time.Hour)} selectionTier := NewSelectionTier() @@ -609,17 +633,3 @@ func (po *ProviderOptimizer) GetExcellenceQoSReportForProvider(providerAddress s return report, providerData.Latency.GetLastUpdateTime() } - -func (po *ProviderOptimizer) CalculateQoSScoresForMetrics(allAddresses []string, ignoredProviders map[string]struct{}, cu uint64, requestedBlock int64) []*metrics.OptimizerQoSReport { - selectionTier, _, providersScores := po.CalculateSelectionTiers(allAddresses, ignoredProviders, cu, requestedBlock) - reports := []*metrics.OptimizerQoSReport{} - - rawScores := selectionTier.GetRawScores() - for idx, entry := range rawScores { - qosReport := providersScores[entry.Address] - qosReport.EntryIndex = idx - reports = append(reports, qosReport) - } - - return reports -} diff --git a/protocol/rpcconsumer/rpcconsumer.go b/protocol/rpcconsumer/rpcconsumer.go index bbdc878120..7515b961c1 100644 --- a/protocol/rpcconsumer/rpcconsumer.go +++ b/protocol/rpcconsumer/rpcconsumer.go @@ -243,6 +243,13 @@ func (rpcc *RPCConsumer) Start(ctx context.Context, options *rpcConsumerStartOpt for _, rpcEndpoint := range options.rpcEndpoints { go func(rpcEndpoint *lavasession.RPCEndpoint) error { defer wg.Done() + chainID := rpcEndpoint.ChainID + chainParser, err := chainlib.NewChainParser(rpcEndpoint.ApiInterface) + if err != nil { + err = utils.LavaFormatError("failed creating chain parser", err, utils.Attribute{Key: "endpoint", Value: rpcEndpoint}) + errCh <- err + return err + } rpcConsumerServer, err := rpcc.CreateConsumerEndpoint(ctx, rpcEndpoint, errCh, consumerAddr, consumerStateTracker, policyUpdaters, optimizers, consumerConsistencies, finalizationConsensuses, chainMutexes, options, privKey, lavaChainID, rpcConsumerMetrics, consumerReportsManager, consumerOptimizerQoSClient, @@ -262,7 +269,94 @@ func (rpcc *RPCConsumer) Start(ctx context.Context, options *rpcConsumerStartOpt }() } } - return err + + err = statetracker.RegisterForSpecUpdatesOrSetStaticSpec(ctx, chainParser, options.cmdFlags.StaticSpecPath, *rpcEndpoint, rpcc.consumerStateTracker) + if err != nil { + err = utils.LavaFormatError("failed registering for spec updates", err, utils.Attribute{Key: "endpoint", Value: rpcEndpoint}) + errCh <- err + return err + } + + _, averageBlockTime, _, _ := chainParser.ChainBlockStats() + var optimizer *provideroptimizer.ProviderOptimizer + var consumerConsistency *ConsumerConsistency + var finalizationConsensus *finalizationconsensus.FinalizationConsensus + getOrCreateChainAssets := func() error { + // this is locked so we don't race optimizers creation + chainMutexes[chainID].Lock() + defer chainMutexes[chainID].Unlock() + var loaded bool + var err error + + // Create / Use existing optimizer + newOptimizer := provideroptimizer.NewProviderOptimizer(options.strategy, averageBlockTime, options.maxConcurrentProviders, consumerOptimizerQoSClient, chainID) + optimizer, loaded, err = optimizers.LoadOrStore(chainID, newOptimizer) + if err != nil { + return utils.LavaFormatError("failed loading optimizer", err, utils.LogAttr("endpoint", rpcEndpoint.Key())) + } + + if !loaded { + // if this is a new optimizer, register it in the consumerOptimizerQoSClient + consumerOptimizerQoSClient.RegisterOptimizer(optimizer, chainID) + } + + // Create / Use existing ConsumerConsistency + newConsumerConsistency := NewConsumerConsistency(chainID) + consumerConsistency, _, err = consumerConsistencies.LoadOrStore(chainID, newConsumerConsistency) + if err != nil { + return utils.LavaFormatError("failed loading consumer consistency", err, utils.LogAttr("endpoint", rpcEndpoint.Key())) + } + + // Create / Use existing FinalizationConsensus + newFinalizationConsensus := finalizationconsensus.NewFinalizationConsensus(rpcEndpoint.ChainID) + finalizationConsensus, loaded, err = finalizationConsensuses.LoadOrStore(chainID, newFinalizationConsensus) + if err != nil { + return utils.LavaFormatError("failed loading finalization consensus", err, utils.LogAttr("endpoint", rpcEndpoint.Key())) + } + if !loaded { // when creating new finalization consensus instance we need to register it to updates + consumerStateTracker.RegisterFinalizationConsensusForUpdates(ctx, finalizationConsensus) + } + return nil + } + err = getOrCreateChainAssets() + if err != nil { + errCh <- err + return err + } + + if finalizationConsensus == nil || optimizer == nil || consumerConsistency == nil { + err = utils.LavaFormatError("failed getting assets, found a nil", nil, utils.Attribute{Key: "endpoint", Value: rpcEndpoint.Key()}) + errCh <- err + return err + } + + // Create active subscription provider storage for each unique chain + activeSubscriptionProvidersStorage := lavasession.NewActiveSubscriptionProvidersStorage() + consumerSessionManager := lavasession.NewConsumerSessionManager(rpcEndpoint, optimizer, consumerMetricsManager, consumerReportsManager, consumerAddr.String(), activeSubscriptionProvidersStorage) + // Register For Updates + rpcc.consumerStateTracker.RegisterConsumerSessionManagerForPairingUpdates(ctx, consumerSessionManager, options.staticProvidersList) + + var relaysMonitor *metrics.RelaysMonitor + if options.cmdFlags.RelaysHealthEnableFlag { + relaysMonitor = metrics.NewRelaysMonitor(options.cmdFlags.RelaysHealthIntervalFlag, rpcEndpoint.ChainID, rpcEndpoint.ApiInterface) + relaysMonitorAggregator.RegisterRelaysMonitor(rpcEndpoint.String(), relaysMonitor) + } + + var consumerWsSubscriptionManager *chainlib.ConsumerWSSubscriptionManager + var specMethodType string + if rpcEndpoint.ApiInterface == spectypes.APIInterfaceJsonRPC { + specMethodType = http.MethodPost + } + consumerWsSubscriptionManager = chainlib.NewConsumerWSSubscriptionManager(consumerSessionManager, rpcConsumerServer, options.refererData, specMethodType, chainParser, activeSubscriptionProvidersStorage, consumerMetricsManager) + + utils.LavaFormatInfo("RPCConsumer Listening", utils.Attribute{Key: "endpoints", Value: rpcEndpoint.String()}) + err = rpcConsumerServer.ServeRPCRequests(ctx, rpcEndpoint, rpcc.consumerStateTracker, chainParser, finalizationConsensus, consumerSessionManager, options.requiredResponses, privKey, lavaChainID, options.cache, rpcConsumerMetrics, consumerAddr, consumerConsistency, relaysMonitor, options.cmdFlags, options.stateShare, options.refererData, consumerReportsManager, consumerWsSubscriptionManager) + if err != nil { + err = utils.LavaFormatError("failed serving rpc requests", err, utils.Attribute{Key: "endpoint", Value: rpcEndpoint}) + errCh <- err + return err + } + return nil }(rpcEndpoint) } diff --git a/scripts/test/httpServer.py b/scripts/test/httpServer.py index ddbe4e77a4..66e5240417 100644 --- a/scripts/test/httpServer.py +++ b/scripts/test/httpServer.py @@ -1,14 +1,37 @@ +import json +import csv from http.server import BaseHTTPRequestHandler, HTTPServer import sys payload_ret = "OK" class RequestHandler(BaseHTTPRequestHandler): + def __init__(self, *args, **kwargs): + self.csv_file_name = kwargs.pop('csv_file_name', 'data.csv') + super().__init__(*args, **kwargs) + def do_GET(self): self.print_request() def do_POST(self): - self.print_request() + content_length = int(self.headers.get("Content-Length", 0)) + if content_length > 0: + body = self.rfile.read(content_length) + data = json.loads(body.decode('utf-8')) + self.write_to_csv(data) + self.send_response(200) + self.send_header("Content-type", "text/html") + self.end_headers() + self.wfile.write(b"OK") + + def write_to_csv(self, data): + with open(self.csv_file_name, 'a', newline='') as csvfile: + fieldnames = data[0].keys() + writer = csv.DictWriter(csvfile, fieldnames=fieldnames) + if csvfile.tell() == 0: # Check if file is empty to write header + writer.writeheader() + for entry in data: + writer.writerow(entry) def print_request(self): # Print request line @@ -27,24 +50,14 @@ def print_request(self): body = self.rfile.read(content_length) print(f"Body:\n{body.decode('utf-8')}") - # Send a response back to the client - response = payload_ret.encode('utf-8') - self.send_response(200) - self.send_header("Content-type", "application/json") - self.end_headers() - self.wfile.write(response) - -def run_server(port=8000): +def run_server(port=8000, csv_file_name='data.csv'): server_address = ('', port) - httpd = HTTPServer(server_address, RequestHandler) - print(f"Server running on port {port}") + handler = lambda *args, **kwargs: RequestHandler(*args, csv_file_name=csv_file_name, **kwargs) + httpd = HTTPServer(server_address, handler) + print(f"Server running on port {port}, writing to {csv_file_name}") httpd.serve_forever() if __name__ == '__main__': - if len(sys.argv) > 1: - port = int(sys.argv[1]) - if len(sys.argv) > 2: - payload_ret = sys.argv[2] - run_server(port) - else: - run_server() \ No newline at end of file + port = int(sys.argv[1]) if len(sys.argv) > 1 else 8000 + csv_file_name = sys.argv[2] if len(sys.argv) > 2 else 'data.csv' + run_server(port, csv_file_name) diff --git a/scripts/test/plot_data.py b/scripts/test/plot_data.py new file mode 100644 index 0000000000..a3ed0607ac --- /dev/null +++ b/scripts/test/plot_data.py @@ -0,0 +1,106 @@ +import csv +import matplotlib.pyplot as plt +from dateutil import parser +import sys +import random +from collections import defaultdict + +def read_csv(file_path): + data = [] + with open(file_path, 'r') as csvfile: + reader = csv.DictReader(csvfile) + for row in reader: + data.append(row) + return data + +def group_by_provider(data): + grouped_data = defaultdict(list) + for entry in data: + grouped_data[entry['provider']].append(entry) + return grouped_data + +def plot_graphs(data, provider, output_file, is_refactored=False): + timestamps = [parser.parse(entry['timestamp']) for entry in data] + sync_scores = [float(entry['sync_score']) for entry in data] + availability_scores = [float(entry['availability_score']) for entry in data] + latency_scores = [float(entry['latency_score']) for entry in data] + generic_scores = [float(entry['generic_score']) for entry in data] + node_error_rates = [float(entry['node_error_rate']) for entry in data] + provider_stake = data[0]['provider_stake'] # Assuming stake is constant + + avg_node_error_rate = sum(node_error_rates) / len(node_error_rates) + + plt.figure(figsize=(10, 5)) + plt.plot(timestamps, sync_scores, label='Sync Score') + plt.plot(timestamps, availability_scores, label='Availability Score') + plt.plot(timestamps, latency_scores, label='Latency Score') + plt.plot(timestamps, generic_scores, label='Generic Score') + plt.xlabel('Timestamp') + plt.ylabel('Scores') + title_suffix = " (Refactor)" if is_refactored else "" + plt.title(f'Provider: {provider}, Avg Node Error Rate: {avg_node_error_rate:.2f}, Stake: {provider_stake}{title_suffix}') + plt.legend() + plt.xticks(rotation=45) + plt.tight_layout() + plt.savefig(output_file) + print(f"Plot saved as {output_file}") + +def plot_tier_chances_over_time(data, provider, output_file, is_refactored=False): + timestamps = [parser.parse(entry['timestamp']) for entry in data] + tier_chances_dicts = [ + {int(k): float(v) for k, v in (item.split(': ') for item in entry['tier_chances'].split(', ') if item)} + for entry in data + ] + + # Collect all unique tiers + all_tiers = set(tier for tier_chances in tier_chances_dicts for tier in tier_chances.keys()) + + plt.figure(figsize=(10, 5)) + for tier in sorted(all_tiers): + tier_chances_over_time = [tier_chances.get(tier, 0) for tier_chances in tier_chances_dicts] + plt.plot(timestamps, tier_chances_over_time, label=f'Tier {tier}') + + plt.xlabel('Timestamp') + plt.ylabel('Chance') + title_suffix = " (Refactor)" if is_refactored else "" + plt.title(f'Provider: {provider} Tier Chances Over Time{title_suffix}') + plt.legend() + plt.xticks(rotation=45) + plt.tight_layout() + plt.savefig(output_file) + print(f"Tier chances over time plot saved as {output_file}") + +if __name__ == '__main__': + if len(sys.argv) < 3: + print("Usage: python plot_data.py ") + sys.exit(1) + + current_csv_file = sys.argv[1] + refactored_csv_file = sys.argv[2] + + current_data = read_csv(current_csv_file) + refactored_data = read_csv(refactored_csv_file) + + current_grouped_data = group_by_provider(current_data) + refactored_grouped_data = group_by_provider(refactored_data) + + # Randomly select one provider from the current mechanism data + selected_provider = random.choice(list(current_grouped_data.keys())) + + # Ensure the selected provider exists in both datasets + if selected_provider not in refactored_grouped_data: + print(f"Provider {selected_provider} not found in refactored data.") + sys.exit(1) + + current_selected_data = current_grouped_data[selected_provider] + refactored_selected_data = refactored_grouped_data[selected_provider] + + # Extract the last 4 characters of the provider address + provider_suffix = selected_provider.split('@')[-1][-4:] + + # Plot graphs for both datasets + plot_graphs(current_selected_data, selected_provider, f"provider_{provider_suffix}_current.png") + plot_tier_chances_over_time(current_selected_data, selected_provider, f"provider_{provider_suffix}_current_tier_chances.png") + + plot_graphs(refactored_selected_data, selected_provider, f"provider_{provider_suffix}_refactor.png", is_refactored=True) + plot_tier_chances_over_time(refactored_selected_data, selected_provider, f"provider_{provider_suffix}_refactor_tier_chances.png", is_refactored=True) \ No newline at end of file diff --git a/utils/score/score_store.go b/utils/score/score_store.go index 03ced5deba..e466851b0c 100644 --- a/utils/score/score_store.go +++ b/utils/score/score_store.go @@ -299,6 +299,7 @@ const ( LatencyScoreType = "latency" SyncScoreType = "sync" AvailabilityScoreType = "availability" + TotalScoreType = "total" // Worst score results for each QoS excellence metric for truncation WorstLatencyScore float64 = 30 // seconds