-
Notifications
You must be signed in to change notification settings - Fork 253
feat(telemetry)_: send connection failure metric #5518
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||
---|---|---|---|---|
|
@@ -31,6 +31,7 @@ const ( | |||
ReceivedMessagesMetric TelemetryType = "ReceivedMessages" | ||||
ErrorSendingEnvelopeMetric TelemetryType = "ErrorSendingEnvelope" | ||||
PeerCountMetric TelemetryType = "PeerCount" | ||||
PeerConnFailuresMetric TelemetryType = "PeerConnFailure" | ||||
|
||||
MaxRetryCache = 5000 | ||||
) | ||||
|
@@ -58,7 +59,22 @@ func (c *Client) PushErrorSendingEnvelope(errorSendingEnvelope wakuv2.ErrorSendi | |||
} | ||||
|
||||
func (c *Client) PushPeerCount(peerCount int) { | ||||
c.processAndPushTelemetry(PeerCount{PeerCount: peerCount}) | ||||
if peerCount != c.lastPeerCount { | ||||
c.lastPeerCount = peerCount | ||||
c.processAndPushTelemetry(PeerCount{PeerCount: peerCount}) | ||||
} | ||||
} | ||||
|
||||
func (c *Client) PushPeerConnFailures(peerConnFailures map[string]int) { | ||||
for peerID, failures := range peerConnFailures { | ||||
if lastFailures, exists := c.lastPeerConnFailures[peerID]; exists { | ||||
if failures == lastFailures { | ||||
continue | ||||
} | ||||
} | ||||
c.lastPeerConnFailures[peerID] = failures | ||||
c.processAndPushTelemetry(PeerConnFailure{FailedPeerId: peerID, FailureCount: failures}) | ||||
} | ||||
} | ||||
|
||||
type ReceivedMessages struct { | ||||
|
@@ -71,21 +87,28 @@ type PeerCount struct { | |||
PeerCount int | ||||
} | ||||
|
||||
type PeerConnFailure struct { | ||||
FailedPeerId string | ||||
FailureCount int | ||||
} | ||||
|
||||
type Client struct { | ||||
serverURL string | ||||
httpClient *http.Client | ||||
logger *zap.Logger | ||||
keyUID string | ||||
nodeName string | ||||
peerId string | ||||
version string | ||||
telemetryCh chan TelemetryRequest | ||||
telemetryCacheLock sync.Mutex | ||||
telemetryCache []TelemetryRequest | ||||
telemetryRetryCache []TelemetryRequest | ||||
nextIdLock sync.Mutex | ||||
nextId int | ||||
sendPeriod time.Duration | ||||
serverURL string | ||||
httpClient *http.Client | ||||
logger *zap.Logger | ||||
keyUID string | ||||
nodeName string | ||||
peerId string | ||||
version string | ||||
telemetryCh chan TelemetryRequest | ||||
telemetryCacheLock sync.Mutex | ||||
telemetryCache []TelemetryRequest | ||||
telemetryRetryCache []TelemetryRequest | ||||
nextIdLock sync.Mutex | ||||
nextId int | ||||
sendPeriod time.Duration | ||||
lastPeerCount int | ||||
lastPeerConnFailures map[string]int | ||||
} | ||||
|
||||
type TelemetryClientOption func(*Client) | ||||
|
@@ -105,19 +128,21 @@ func WithPeerID(peerId string) TelemetryClientOption { | |||
func NewClient(logger *zap.Logger, serverURL string, keyUID string, nodeName string, version string, opts ...TelemetryClientOption) *Client { | ||||
serverURL = strings.TrimRight(serverURL, "/") | ||||
client := &Client{ | ||||
serverURL: serverURL, | ||||
httpClient: &http.Client{Timeout: time.Minute}, | ||||
logger: logger, | ||||
keyUID: keyUID, | ||||
nodeName: nodeName, | ||||
version: version, | ||||
telemetryCh: make(chan TelemetryRequest), | ||||
telemetryCacheLock: sync.Mutex{}, | ||||
telemetryCache: make([]TelemetryRequest, 0), | ||||
telemetryRetryCache: make([]TelemetryRequest, 0), | ||||
nextId: 0, | ||||
nextIdLock: sync.Mutex{}, | ||||
sendPeriod: 10 * time.Second, // default value | ||||
serverURL: serverURL, | ||||
httpClient: &http.Client{Timeout: time.Minute}, | ||||
logger: logger, | ||||
keyUID: keyUID, | ||||
nodeName: nodeName, | ||||
version: version, | ||||
telemetryCh: make(chan TelemetryRequest), | ||||
telemetryCacheLock: sync.Mutex{}, | ||||
telemetryCache: make([]TelemetryRequest, 0), | ||||
telemetryRetryCache: make([]TelemetryRequest, 0), | ||||
nextId: 0, | ||||
nextIdLock: sync.Mutex{}, | ||||
sendPeriod: 10 * time.Second, // default value | ||||
lastPeerCount: 0, | ||||
lastPeerConnFailures: make(map[string]int), | ||||
} | ||||
|
||||
for _, opt := range opts { | ||||
|
@@ -207,6 +232,12 @@ func (c *Client) processAndPushTelemetry(data interface{}) { | |||
TelemetryType: PeerCountMetric, | ||||
TelemetryData: c.ProcessPeerCount(v), | ||||
} | ||||
case PeerConnFailure: | ||||
telemetryRequest = TelemetryRequest{ | ||||
Id: c.nextId, | ||||
TelemetryType: PeerConnFailuresMetric, | ||||
TelemetryData: c.ProcessPeerConnFailure(v), | ||||
} | ||||
default: | ||||
c.logger.Error("Unknown telemetry data type") | ||||
return | ||||
|
@@ -340,6 +371,21 @@ func (c *Client) ProcessPeerCount(peerCount PeerCount) *json.RawMessage { | |||
return &jsonRawMessage | ||||
} | ||||
|
||||
func (c *Client) ProcessPeerConnFailure(peerConnFailure PeerConnFailure) *json.RawMessage { | ||||
postBody := map[string]interface{}{ | ||||
"failedPeerId": peerConnFailure.FailedPeerId, | ||||
"failureCount": peerConnFailure.FailureCount, | ||||
"nodeName": c.nodeName, | ||||
"nodeKeyUID": c.keyUID, | ||||
"peerId": c.peerId, | ||||
"statusVersion": c.version, | ||||
"timestamp": time.Now().Unix(), | ||||
} | ||||
body, _ := json.Marshal(postBody) | ||||
jsonRawMessage := json.RawMessage(body) | ||||
Comment on lines
+384
to
+385
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We marshal and unmarshal here 🤔. And then we marshal again here: Line 230 in 780e3e5
It would be nice to reduce these procedures. We could change I guess it can be done out of this PR ofc. |
||||
return &jsonRawMessage | ||||
} | ||||
|
||||
func (c *Client) UpdateEnvelopeProcessingError(shhMessage *types.Message, processingError error) { | ||||
c.logger.Debug("Pushing envelope update to telemetry server", zap.String("hash", types.EncodeHex(shhMessage.Hash))) | ||||
url := fmt.Sprintf("%s/update-envelope", c.serverURL) | ||||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -109,8 +109,6 @@ func (w *Waku) broadcast() { | |
} | ||
} | ||
|
||
fn = w.limiter.ThrottlePublishFn(w.ctx, fn) | ||
|
||
// Wraps the publish function with a call to the telemetry client | ||
if w.statusTelemetryClient != nil { | ||
sendFn := fn | ||
|
@@ -125,6 +123,9 @@ func (w *Waku) broadcast() { | |
} | ||
} | ||
|
||
// Wraps the publish function with rate limiter | ||
fn = w.limiter.ThrottlePublishFn(w.ctx, fn) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @richard-ramos I moved the rate limit wrapper after the telemetry wrapper so that rate limiting doesn't get caught as a publish error |
||
|
||
w.wg.Add(1) | ||
go w.publishEnvelope(envelope, fn, logger) | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just a suggestion, out of this PR