diff --git a/protocol/common/message_sender.go b/protocol/common/message_sender.go index 20e9ab54461..7cc11b8a6bb 100644 --- a/protocol/common/message_sender.go +++ b/protocol/common/message_sender.go @@ -26,6 +26,13 @@ import ( v1protocol "github.com/status-im/status-go/protocol/v1" ) +type TelemetryService interface { + PushRawMessageByType(ctx context.Context, msg struct { + MessageType string + Size uint32 + }) +} + // Whisper message properties. const ( whisperTTL = 15 @@ -88,6 +95,8 @@ type MessageSender struct { // handleSharedSecrets is a callback that is called every time a new shared secret is negotiated handleSharedSecrets func([]*sharedsecret.Secret) error + + telemetryClient TelemetryService } func NewMessageSender( @@ -113,6 +122,10 @@ func NewMessageSender( return p, nil } +func (s *MessageSender) WithTelemetryClient(client TelemetryService) { + s.telemetryClient = client +} + func (s *MessageSender) Stop() { s.messageEventsSubscriptionsMutex.Lock() defer s.messageEventsSubscriptionsMutex.Unlock() @@ -432,6 +445,9 @@ func (s *MessageSender) sendCommunity( zap.String("messageType", "community"), zap.Any("contentType", rawMessage.MessageType), zap.Strings("hashes", types.EncodeHexes(hashes))) + if s.telemetryClient != nil { + s.sendBandwidthMetric(ctx, rawMessage) + } s.transport.Track(messageID, hashes, newMessages) return messageID, nil @@ -550,6 +566,10 @@ func (s *MessageSender) sendPrivate( s.transport.Track(messageID, hashes, newMessages) } + if s.telemetryClient != nil { + s.sendBandwidthMetric(ctx, rawMessage) + } + return messageID, nil } @@ -578,6 +598,9 @@ func (s *MessageSender) SendPairInstallation( return nil, errors.Wrap(err, "failed to send a message spec") } + if s.telemetryClient != nil { + s.sendBandwidthMetric(ctx, &rawMessage) + } s.transport.Track(messageID, hashes, newMessages) return messageID, nil @@ -808,6 +831,9 @@ func (s *MessageSender) SendPublic( zap.Any("contentType", rawMessage.MessageType), zap.String("messageType", "public"), zap.Strings("hashes", types.EncodeHexes(hashes))) + if s.telemetryClient != nil { + s.sendBandwidthMetric(ctx, &rawMessage) + } s.transport.Track(messageID, hashes, newMessages) return messageID, nil @@ -1381,3 +1407,13 @@ func (s *MessageSender) CleanupHashRatchetEncryptedMessages() error { return nil } + +func (s *MessageSender) sendBandwidthMetric(ctx context.Context, rawMessage *RawMessage) { + s.telemetryClient.PushRawMessageByType(ctx, struct { + MessageType string + Size uint32 + }{ + MessageType: rawMessage.MessageType.String(), + Size: uint32(len(rawMessage.Payload)), + }) +} diff --git a/protocol/common/message_sender_test.go b/protocol/common/message_sender_test.go index 09a299fe5e1..61397d60e61 100644 --- a/protocol/common/message_sender_test.go +++ b/protocol/common/message_sender_test.go @@ -1,6 +1,7 @@ package common import ( + "context" "math" "testing" @@ -40,6 +41,14 @@ type MessageSenderSuite struct { logger *zap.Logger } +type mockTelemetryService struct{} + +func (m *mockTelemetryService) PushRawMessageByType(ctx context.Context, msg struct { + MessageType string + Size uint32 +}) { +} + func (s *MessageSenderSuite) SetupTest() { s.testMessage = protobuf.ChatMessage{ Text: "abc123", @@ -95,6 +104,9 @@ func (s *MessageSenderSuite) SetupTest() { Datasync: true, }, ) + + mockTelemetry := &mockTelemetryService{} + s.sender.WithTelemetryClient(mockTelemetry) s.Require().NoError(err) } diff --git a/protocol/messenger.go b/protocol/messenger.go index 77f5f2c025d..ca25183c3c0 100644 --- a/protocol/messenger.go +++ b/protocol/messenger.go @@ -533,6 +533,8 @@ func NewMessenger( c.wakuService.SetStatusTelemetryClient(telemetryClient) } telemetryClient.Start(ctx) + + sender.WithTelemetryClient(telemetryClient) } messenger = &Messenger{ diff --git a/protocol/messenger_builder_test.go b/protocol/messenger_builder_test.go index 894b706645a..af7a768c175 100644 --- a/protocol/messenger_builder_test.go +++ b/protocol/messenger_builder_test.go @@ -1,6 +1,7 @@ package protocol import ( + "context" "crypto/ecdsa" "github.com/google/uuid" @@ -52,6 +53,14 @@ func (tmc *testMessengerConfig) complete() error { return nil } +type mockTelemetryService struct{} + +func (m *mockTelemetryService) PushRawMessageByType(ctx context.Context, msg struct { + MessageType string + Size uint32 +}) { +} + func newTestMessenger(waku types.Waku, config testMessengerConfig) (*Messenger, error) { err := config.complete() if err != nil { @@ -96,6 +105,9 @@ func newTestMessenger(waku types.Waku, config testMessengerConfig) (*Messenger, "testVersion", options..., ) + + mockTelemetry := &mockTelemetryService{} + m.sender.WithTelemetryClient(mockTelemetry) if err != nil { return nil, err } diff --git a/protocol/messenger_peersyncing.go b/protocol/messenger_peersyncing.go index b47149f364e..19035828ef0 100644 --- a/protocol/messenger_peersyncing.go +++ b/protocol/messenger_peersyncing.go @@ -415,6 +415,15 @@ func (m *Messenger) sendDataSync(receiver state.PeerID, payload *datasyncproto.P } m.logger.Debug("sent private messages", zap.Any("messageIDs", hexMessageIDs), zap.Strings("hashes", types.EncodeHexes(hashes))) + if m.telemetryClient != nil { + m.telemetryClient.PushRawMessageByType(ctx, struct { + MessageType string + Size uint32 + }{ + MessageType: "DATASYNC", + Size: uint32(len(marshalledPayload)), + }) + } m.transport.TrackMany(messageIDs, hashes, newMessages) return nil diff --git a/telemetry/client.go b/telemetry/client.go index 5ded3e7b748..51f2226f558 100644 --- a/telemetry/client.go +++ b/telemetry/client.go @@ -60,6 +60,8 @@ const ( MessageDeliveryConfirmedMetric TelemetryType = "MessageDeliveryConfirmed" // Total number and size of Waku messages sent by this node SentMessageTotalMetric TelemetryType = "SentMessageTotal" + // Size and type of raw message successfully returned by dispatchMessage + RawMessageByTypeMetric TelemetryType = "RawMessageByType" ) const MaxRetryCache = 5000 @@ -151,6 +153,13 @@ func (c *Client) PushSentMessageTotal(ctx context.Context, messageSize uint32) { c.processAndPushTelemetry(ctx, SentMessageTotal{Size: messageSize}) } +func (c *Client) PushRawMessageByType(ctx context.Context, msg struct { + MessageType string + Size uint32 +}) { + c.processAndPushTelemetry(ctx, RawMessageByType{MessageType: msg.MessageType, Size: msg.Size}) +} + type ReceivedMessages struct { Filter transport.Filter SSHMessage *types.Message @@ -206,6 +215,11 @@ type SentMessageTotal struct { Size uint32 } +type RawMessageByType struct { + MessageType string + Size uint32 +} + type Client struct { serverURL string httpClient *http.Client @@ -287,6 +301,7 @@ func (c *Client) Start(ctx context.Context) { } } }() + go func() { defer common.LogOnPanic() sendPeriod := c.sendPeriod @@ -317,7 +332,6 @@ func (c *Client) Start(ctx context.Context) { return } } - }() } @@ -408,6 +422,12 @@ func (c *Client) processAndPushTelemetry(ctx context.Context, data interface{}) TelemetryType: SentMessageTotalMetric, TelemetryData: c.ProcessSentMessageTotal(v), } + case RawMessageByType: + telemetryRequest = TelemetryRequest{ + Id: c.nextId, + TelemetryType: RawMessageByTypeMetric, + TelemetryData: c.ProcessRawMessageByType(v), + } default: c.logger.Error("Unknown telemetry data type") return @@ -589,6 +609,13 @@ func (c *Client) ProcessSentMessageTotal(sentMessageTotal SentMessageTotal) *jso return c.marshalPostBody(postBody) } +func (c *Client) ProcessRawMessageByType(rawMessageByType RawMessageByType) *json.RawMessage { + postBody := c.commonPostBody() + postBody["messageType"] = rawMessageByType.MessageType + postBody["size"] = rawMessageByType.Size + return c.marshalPostBody(postBody) +} + // Helper function to marshal post body and handle errors func (c *Client) marshalPostBody(postBody map[string]interface{}) *json.RawMessage { body, err := json.Marshal(postBody) diff --git a/telemetry/client_test.go b/telemetry/client_test.go index 7c1e6e7848d..deeb34368aa 100644 --- a/telemetry/client_test.go +++ b/telemetry/client_test.go @@ -7,6 +7,7 @@ import ( "net/http" "net/http/httptest" "os" + "reflect" "slices" "sync" "testing" @@ -477,11 +478,29 @@ type testCase struct { expectedFields map[string]interface{} } -func runTestCase(t *testing.T, tc testCase) { +func runTestCase(t *testing.T, tc testCase, methodName string) { ctx := context.Background() client := createClient(t, "") - go client.processAndPushTelemetry(ctx, tc.input) + if methodName == "" { + // Default to just calling processAndPushTelemetry directly + go client.processAndPushTelemetry(ctx, tc.input) + } else { + // Get the method by name using reflection + method := reflect.ValueOf(client).MethodByName(methodName) + if !method.IsValid() { + t.Fatalf("Method %s not found on Client", methodName) + } + + // Create the arguments for the method call + args := []reflect.Value{ + reflect.ValueOf(ctx), + reflect.ValueOf(tc.input), + } + + // Call the method + go method.Call(args) + } telemetryRequest := <-client.telemetryCh @@ -514,16 +533,14 @@ func runTestCase(t *testing.T, tc testCase) { func TestProcessMessageDeliveryConfirmed(t *testing.T) { tc := testCase{ - name: "MessageDeliveryConfirmed", - input: MessageDeliveryConfirmed{ - MessageHash: "0x1234567890abcdef", - }, + name: "MessageDeliveryConfirmed", + input: "0x1234567890abcdef", expectedType: MessageDeliveryConfirmedMetric, expectedFields: map[string]interface{}{ "messageHash": "0x1234567890abcdef", }, } - runTestCase(t, tc) + runTestCase(t, tc, "PushMessageDeliveryConfirmed") } func TestProcessMissedRelevantMessage(t *testing.T) { @@ -539,10 +556,8 @@ func TestProcessMissedRelevantMessage(t *testing.T) { common.MissingMessageType, ) tc := testCase{ - name: "MissedRelevantMessage", - input: MissedRelevantMessage{ - ReceivedMessage: message, - }, + name: "MissedRelevantMessage", + input: message, expectedType: MissedRelevantMessageMetric, expectedFields: map[string]interface{}{ "messageHash": message.Envelope.Hash().String(), @@ -550,7 +565,7 @@ func TestProcessMissedRelevantMessage(t *testing.T) { "contentTopic": "0x12345679", }, } - runTestCase(t, tc) + runTestCase(t, tc, "PushMissedRelevantMessage") } func TestProcessMissedMessage(t *testing.T) { @@ -566,10 +581,8 @@ func TestProcessMissedMessage(t *testing.T) { common.MissingMessageType, ) tc := testCase{ - name: "MissedMessage", - input: MissedMessage{ - Envelope: message.Envelope, - }, + name: "MissedMessage", + input: message.Envelope, expectedType: MissedMessageMetric, expectedFields: map[string]interface{}{ "messageHash": message.Envelope.Hash().String(), @@ -577,15 +590,15 @@ func TestProcessMissedMessage(t *testing.T) { "contentTopic": message.Envelope.Message().ContentTopic, }, } - runTestCase(t, tc) + runTestCase(t, tc, "PushMissedMessage") } func TestProcessDialFailure(t *testing.T) { tc := testCase{ name: "DialFailure", - input: DialFailure{ - ErrorType: common.ErrorUnknown, - ErrorMsg: "test error message", + input: common.DialError{ + ErrType: common.ErrorUnknown, + ErrMsg: "test error message", Protocols: "test-protocols", }, expectedType: DialFailureMetric, @@ -595,19 +608,36 @@ func TestProcessDialFailure(t *testing.T) { "protocols": "test-protocols", }, } - runTestCase(t, tc) + runTestCase(t, tc, "PushDialFailure") } func TestProcessSentMessageTotal(t *testing.T) { tc := testCase{ - name: "SentMessageTotal", - input: SentMessageTotal{ - Size: uint32(1234), - }, + name: "SentMessageTotal", + input: uint32(1234), expectedType: SentMessageTotalMetric, expectedFields: map[string]interface{}{ "size": float64(1234), }, } - runTestCase(t, tc) + runTestCase(t, tc, "PushSentMessageTotal") +} + +func TestProcessRawMessageByType(t *testing.T) { + tc := testCase{ + name: "RawMessageByType", + input: struct { + MessageType string + Size uint32 + }{ + MessageType: "test-message-type", + Size: 1234, + }, + expectedType: RawMessageByTypeMetric, + expectedFields: map[string]interface{}{ + "messageType": "test-message-type", + "size": float64(1234), + }, + } + runTestCase(t, tc, "PushRawMessageByType") } diff --git a/wakuv2/api.go b/wakuv2/api.go index 09ea54ae03b..3fe93178de8 100644 --- a/wakuv2/api.go +++ b/wakuv2/api.go @@ -23,6 +23,8 @@ import ( "crypto/ecdsa" "errors" "fmt" + "runtime/debug" + "strings" "sync" "time" @@ -190,6 +192,9 @@ type NewMessage struct { // Post posts a message on the Waku network. // returns the hash of the message in case of success. func (api *PublicWakuAPI) Post(ctx context.Context, req NewMessage) (hexutil.Bytes, error) { + if !strings.Contains(string(debug.Stack()), "message_sender.go") { + logutils.ZapLogger().Debug("AK stack trace", zap.String("stack", string(debug.Stack()))) + } var ( symKeyGiven = len(req.SymKeyID) > 0 pubKeyGiven = len(req.PublicKey) > 0