Skip to content

feat_: add ability to process prometheus metrics in telemetry client #5782

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 4 commits into
base: develop
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion cmd/status-cli/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,8 @@ func createAccountAndLogin(b *api.GethStatusBackend, rootDataDir, password strin
HTTPHost: "127.0.0.1",
HTTPPort: p.Port,
},
TelemetryServerURL: p.TelemetryURL,
TelemetryServerURL: p.TelemetryURL,
WakuV2EnableMissingMessageVerification: true,
}
return b.CreateAccountAndLogin(req,
params.WithFleet(p.Fleet),
Expand Down
40 changes: 40 additions & 0 deletions telemetry/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ import (

"go.uber.org/zap"

"github.com/prometheus/client_golang/prometheus"

"github.com/status-im/status-go/eth-node/types"
"github.com/status-im/status-go/protocol/transport"
"github.com/status-im/status-go/wakuv2"
Expand Down Expand Up @@ -94,6 +96,11 @@ type PeerConnFailure struct {
FailureCount int
}

type PrometheusMetricWrapper struct {
Typ TelemetryType
Data *json.RawMessage
}

type Client struct {
serverURL string
httpClient *http.Client
Expand All @@ -113,6 +120,8 @@ type Client struct {
lastPeerCountTime time.Time
lastPeerConnFailures map[string]int
deviceType string

promMetrics *PrometheusMetrics
}

type TelemetryClientOption func(*Client)
Expand Down Expand Up @@ -154,6 +163,14 @@ func NewClient(logger *zap.Logger, serverURL string, keyUID string, nodeName str
opt(client)
}

promMetrics := NewPrometheusMetrics(client.processAndPushTelemetry, TelemetryRecord{NodeName: nodeName, PeerID: client.peerId, StatusVersion: version, DeviceType: client.deviceType})
client.promMetrics = promMetrics

client.promMetrics.Register("waku_connected_peers", GaugeType, nil)
client.promMetrics.Register("waku2_envelopes_validated_total", CounterType, prometheus.Labels{})
client.promMetrics.Register("waku_lightpush_messages", CounterType, prometheus.Labels{})
client.promMetrics.Register("waku_lightpush_errors", CounterType, prometheus.Labels{})

return client
}

Expand Down Expand Up @@ -205,6 +222,21 @@ func (c *Client) Start(ctx context.Context) {
}

}()

go func() {
ticker := time.NewTicker(5 * time.Second)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
fmt.Println("exit")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should probably remove

return
case <-ticker.C:
c.promMetrics.Snapshot()

}
}
}()
}

func (c *Client) processAndPushTelemetry(ctx context.Context, data interface{}) {
Expand Down Expand Up @@ -246,6 +278,13 @@ func (c *Client) processAndPushTelemetry(ctx context.Context, data interface{})
TelemetryType: PeerConnFailuresMetric,
TelemetryData: c.ProcessPeerConnFailure(v),
}
case PrometheusMetricWrapper:
pmd := data.(PrometheusMetricWrapper)
telemetryRequest = TelemetryRequest{
Id: c.nextId,
TelemetryType: pmd.Typ,
TelemetryData: pmd.Data,
}
default:
c.logger.Error("Unknown telemetry data type")
return
Expand Down Expand Up @@ -390,6 +429,7 @@ func (c *Client) UpdateEnvelopeProcessingError(shhMessage *types.Message, proces
if processingError != nil {
errorString = processingError.Error()
}

postBody := map[string]interface{}{
"messageHash": types.EncodeHex(shhMessage.Hash),
"sentAt": shhMessage.Timestamp,
Expand Down
139 changes: 139 additions & 0 deletions telemetry/prometheus.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
package telemetry

import (
"context"
"encoding/json"
"log"
"time"

"github.com/prometheus/client_golang/prometheus"
prom_model "github.com/prometheus/client_model/go"
)

type MetricType int

const (
_ MetricType = iota
CounterType
GaugeType
)

type TelemetryRecord struct {
NodeName string `json:"nodeName"`
PeerID string `json:"peerId"`
StatusVersion string `json:"statusVersion"`
DeviceType string `json:"deviceType"`
}

type ProcessTelemetryRequest func(ctx context.Context, data interface{})

type MetricPayload struct {
Name string
Value []*prom_model.Metric
}

type Metric struct {
typ MetricType
labels map[string]string
}

type PrometheusMetrics struct {
metrics map[string]Metric
process ProcessTelemetryRequest
telemetryRecord TelemetryRecord
}

func NewPrometheusMetrics(process ProcessTelemetryRequest, tc TelemetryRecord) *PrometheusMetrics {
return &PrometheusMetrics{
metrics: make(map[string]Metric),
process: process,
telemetryRecord: tc,
}
}

func (pm *PrometheusMetrics) Register(name string, typ MetricType, labels prometheus.Labels) {
pm.metrics[name] = Metric{typ, labels}
}

func (pm *PrometheusMetrics) Snapshot() {
gatherer := prometheus.DefaultGatherer
metrics, err := gatherer.Gather()
if err != nil {
log.Fatalf("Failed to gather metrics: %v", err)
Copy link
Member

@richard-ramos richard-ramos Aug 29, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure if we should use Fatalf. If i remember correctly, this panics, and IMO not being able to push metrics isn't an end of the world situation!

}

for _, mf := range metrics {
metric, ok := pm.metrics[*mf.Name]
if !ok {
continue
}

metricFamilyValue := mf.GetMetric()

if len(metricFamilyValue) == 0 {
continue
}

metricValue := []*prom_model.Metric{}

if metric.labels != nil { //filter out metrics based on labels
for _, m := range mf.GetMetric() {

matchCnt := len(metric.labels)

for name, value := range metric.labels {
for _, label := range m.GetLabel() {
if name == *label.Name && value == *label.Value {
matchCnt--
}
}
}

if matchCnt > 0 {
continue
}

metricValue = append(metricValue, m)

}
} else {
metricValue = metricFamilyValue
}

if len(metricValue) == 0 {
continue
}

p := MetricPayload{Name: *mf.Name, Value: metricValue}

pm.ToTelemetryRequest(p)
}

}

func (pm *PrometheusMetrics) ToTelemetryRequest(p MetricPayload) error {
postBody := map[string]interface{}{
"value": p.Value,
"name": p.Name,
"nodeName": pm.telemetryRecord.NodeName,
"deviceType": pm.telemetryRecord.DeviceType,
"peerId": pm.telemetryRecord.PeerID,
"statusVersion": pm.telemetryRecord.StatusVersion,
"timestamp": time.Now().Unix(),
}

telemtryData, err := json.Marshal(postBody)
if err != nil {
return err
}

rawData := json.RawMessage(telemtryData)

wrap := PrometheusMetricWrapper{
Typ: "PrometheusMetric",
Data: &rawData,
}

pm.process(context.Background(), wrap)
return nil
}
6 changes: 3 additions & 3 deletions wakuv2/common/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,10 @@ var (
Name: "waku2_envelopes_received_total",
Help: "Number of envelopes received.",
})
EnvelopesValidatedCounter = prom.NewCounter(prom.CounterOpts{
EnvelopesValidatedCounter = prom.NewCounterVec(prom.CounterOpts{
Name: "waku2_envelopes_validated_total",
Help: "Number of envelopes processed successfully.",
})
}, []string{"pubsubTopic", "type"})
EnvelopesRejectedCounter = prom.NewCounterVec(prom.CounterOpts{
Name: "waku2_envelopes_rejected_total",
Help: "Number of envelopes rejected.",
Expand All @@ -52,7 +52,7 @@ var (

func init() {
prom.MustRegister(EnvelopesReceivedCounter)
prom.MustRegister(EnvelopesRejectedCounter)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Was this metric supposed to be removed?

prom.MustRegister(EnvelopesValidatedCounter)
prom.MustRegister(EnvelopesCacheFailedCounter)
prom.MustRegister(EnvelopesCachedCounter)
prom.MustRegister(EnvelopesSizeMeter)
Expand Down
8 changes: 6 additions & 2 deletions wakuv2/waku.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import (
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/peerstore"
"github.com/multiformats/go-multiaddr"
"github.com/prometheus/client_golang/prometheus"

"go.uber.org/zap"

Expand Down Expand Up @@ -274,6 +275,7 @@ func New(nodeKey *ecdsa.PrivateKey, fleet string, cfg *Config, logger *zap.Logge
node.WithLogLevel(logger.Level()),
node.WithClusterID(cfg.ClusterID),
node.WithMaxMsgSize(1024 * 1024),
node.WithPrometheusRegisterer(prometheus.DefaultRegisterer),
}

if cfg.EnableDiscV5 {
Expand Down Expand Up @@ -1105,10 +1107,11 @@ func (w *Waku) Start() error {
w.logger)

w.missingMsgVerifier.Start(w.ctx)
w.logger.Info("Started missing message verifier")

w.wg.Add(1)
go func() {
w.wg.Done()
defer w.wg.Done()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not sure how this wait group is intended to behave cc @richard-ramos

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Adding the defer is correct. This is a bug i introduced when doing the refactoring

for {
select {
case <-w.ctx.Done():
Expand All @@ -1118,6 +1121,7 @@ func (w *Waku) Start() error {
if err != nil {
w.logger.Error("OnNewEnvelopes error", zap.Error(err))
}
w.logger.Info("Got a missing message!")
}
}
}()
Expand Down Expand Up @@ -1347,7 +1351,7 @@ func (w *Waku) OnNewEnvelopes(envelope *protocol.Envelope, msgType common.Messag
trouble = true
}

common.EnvelopesValidatedCounter.Inc()
common.EnvelopesValidatedCounter.With(prometheus.Labels{"pubsubTopic": envelope.PubsubTopic(), "type": msgType}).Inc()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note that we need the meaning of common.MissingMessageType to be consistent moving forward, otherwise it can mess up our metrics. i.e. only messages returned by the periodic store query can have this type


if trouble {
return errors.New("received invalid envelope")
Expand Down