Skip to content

Commit

Permalink
feat: PRT - dappid and consumer ip added to protocol message. (#1643)
Browse files Browse the repository at this point in the history
* feat: PRT - dappid and consumer ip added to protocol message.

* fix test

* fixing comments

* fix test

* adding comment for wrapping protocol message.

* add user data everywhere

* fix me

* fix lint

* fix all mocked tests

---------

Co-authored-by: Omer <[email protected]>
  • Loading branch information
ranlavanet and omerlavanet authored Aug 26, 2024
1 parent 359da07 commit 0d1806d
Show file tree
Hide file tree
Showing 13 changed files with 168 additions and 125 deletions.
4 changes: 1 addition & 3 deletions protocol/chainlib/chainlib.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,12 +128,10 @@ type RelaySender interface {
) (ProtocolMessage, error)
SendParsedRelay(
ctx context.Context,
dappID string,
consumerIp string,
analytics *metrics.RelayMetrics,
protocolMessage ProtocolMessage,
) (relayResult *common.RelayResult, errRet error)
CreateDappKey(dappID, consumerIp string) string
CreateDappKey(userData common.UserData) string
CancelSubscriptionContext(subscriptionKey string)
SetConsistencySeenBlock(blockSeen int64, key string)
}
Expand Down
16 changes: 8 additions & 8 deletions protocol/chainlib/chainlib_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion protocol/chainlib/consumer_websocket_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ func (cwm *ConsumerWebsocketManager) ListenToMessages() {
continue
} else {
// Normal relay over websocket. (not subscription related)
relayResult, err := cwm.relaySender.SendParsedRelay(webSocketCtx, dappID, userIp, metricsData, protocolMessage)
relayResult, err := cwm.relaySender.SendParsedRelay(webSocketCtx, metricsData, protocolMessage)
if err != nil {
formatterMsg := logger.AnalyzeWebSocketErrorAndGetFormattedMessage(websocketConn.LocalAddr().String(), utils.LavaFormatError("could not send parsed relay", err), msgSeed, msg, cwm.apiInterface, time.Since(startTime))
if formatterMsg != nil {
Expand Down
17 changes: 9 additions & 8 deletions protocol/chainlib/consumer_ws_subscription_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -301,7 +301,7 @@ func (cwsm *ConsumerWSSubscriptionManager) StartSubscription(
utils.LogAttr("dappKey", dappKey),
)

relayResult, err := cwsm.relaySender.SendParsedRelay(webSocketCtx, dappID, consumerIp, metricsData, protocolMessage)
relayResult, err := cwsm.relaySender.SendParsedRelay(webSocketCtx, metricsData, protocolMessage)
if err != nil {
onSubscriptionFailure()
return nil, nil, utils.LavaFormatError("could not send subscription relay", err)
Expand Down Expand Up @@ -490,7 +490,7 @@ func (cwsm *ConsumerWSSubscriptionManager) listenForSubscriptionMessages(
}

unsubscribeRelayCtx := utils.WithUniqueIdentifier(context.Background(), utils.GenerateUniqueIdentifier())
err = cwsm.sendUnsubscribeMessage(unsubscribeRelayCtx, dappID, userIp, protocolMessage, metricsData)
err = cwsm.sendUnsubscribeMessage(unsubscribeRelayCtx, protocolMessage, metricsData)
if err != nil {
utils.LavaFormatError("could not send unsubscribe message due to a relay error",
err,
Expand Down Expand Up @@ -702,15 +702,16 @@ func (cwsm *ConsumerWSSubscriptionManager) craftUnsubscribeMessage(hashedParams,
return protocolMessage, nil
}

func (cwsm *ConsumerWSSubscriptionManager) sendUnsubscribeMessage(ctx context.Context, dappID, consumerIp string, protocolMessage ProtocolMessage, metricsData *metrics.RelayMetrics) error {
func (cwsm *ConsumerWSSubscriptionManager) sendUnsubscribeMessage(ctx context.Context, protocolMessage ProtocolMessage, metricsData *metrics.RelayMetrics) error {
// Send the crafted unsubscribe relay
userData := protocolMessage.GetUserData()
utils.LavaFormatTrace("sending unsubscribe relay",
utils.LogAttr("GUID", ctx),
utils.LogAttr("dappID", dappID),
utils.LogAttr("consumerIp", consumerIp),
utils.LogAttr("dappID", userData.DappId),
utils.LogAttr("consumerIp", userData.ConsumerIp),
)

_, err := cwsm.relaySender.SendParsedRelay(ctx, dappID, consumerIp, metricsData, protocolMessage)
_, err := cwsm.relaySender.SendParsedRelay(ctx, metricsData, protocolMessage)
if err != nil {
return utils.LavaFormatError("could not send unsubscribe relay", err)
}
Expand All @@ -733,8 +734,8 @@ func (cwsm *ConsumerWSSubscriptionManager) connectDappWithSubscription(dappKey s
cwsm.connectedDapps[dappKey][hashedParams] = webSocketChan
}

func (cwsm *ConsumerWSSubscriptionManager) CreateWebSocketConnectionUniqueKey(dappID, consumerIp, webSocketConnectionUniqueId string) string {
return cwsm.relaySender.CreateDappKey(dappID, consumerIp) + "__" + webSocketConnectionUniqueId
func (cwsm *ConsumerWSSubscriptionManager) CreateWebSocketConnectionUniqueKey(dappID, consumerIp string, webSocketConnectionUniqueId string) string {
return cwsm.relaySender.CreateDappKey(common.UserData{DappId: dappID, ConsumerIp: consumerIp}) + "__" + webSocketConnectionUniqueId
}

func (cwsm *ConsumerWSSubscriptionManager) UnsubscribeAll(webSocketCtx context.Context, dappID, consumerIp string, webSocketConnectionUniqueId string, metricsData *metrics.RelayMetrics) error {
Expand Down
51 changes: 26 additions & 25 deletions protocol/chainlib/consumer_ws_subscription_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,13 +67,13 @@ func TestConsumerWSSubscriptionManagerParallelSubscriptionsOnSameDappIdIp(t *tes

chainMessage1, err := chainParser.ParseMsg("", play.subscriptionRequestData1, play.connectionType, nil, extensionslib.ExtensionInfo{LatestBlock: 0})
require.NoError(t, err)
protocolMessage1 := NewProtocolMessage(chainMessage1, nil, nil)
protocolMessage1 := NewProtocolMessage(chainMessage1, nil, nil, dapp, ip)
relaySender := NewMockRelaySender(ctrl)
relaySender.
EXPECT().
CreateDappKey(gomock.Any(), gomock.Any()).
DoAndReturn(func(dappID, consumerIp string) string {
return dappID + consumerIp
CreateDappKey(gomock.Any()).
DoAndReturn(func(userData common.UserData) string {
return userData.DappId + userData.ConsumerIp
}).
AnyTimes()
relaySender.
Expand Down Expand Up @@ -129,7 +129,7 @@ func TestConsumerWSSubscriptionManagerParallelSubscriptionsOnSameDappIdIp(t *tes

relaySender.
EXPECT().
SendParsedRelay(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).
SendParsedRelay(gomock.Any(), gomock.Any(), gomock.Any()).
Return(relayResult1, nil).
Times(1) // Should call SendParsedRelay, because it is the first time we subscribe

Expand Down Expand Up @@ -224,13 +224,13 @@ func TestConsumerWSSubscriptionManagerParallelSubscriptions(t *testing.T) {

chainMessage1, err := chainParser.ParseMsg("", play.subscriptionRequestData1, play.connectionType, nil, extensionslib.ExtensionInfo{LatestBlock: 0})
require.NoError(t, err)
protocolMessage1 := NewProtocolMessage(chainMessage1, nil, nil)
protocolMessage1 := NewProtocolMessage(chainMessage1, nil, nil, dapp, "")
relaySender := NewMockRelaySender(ctrl)
relaySender.
EXPECT().
CreateDappKey(gomock.Any(), gomock.Any()).
DoAndReturn(func(dappID, consumerIp string) string {
return dappID + consumerIp
CreateDappKey(gomock.Any()).
DoAndReturn(func(userData common.UserData) string {
return userData.DappId + userData.ConsumerIp
}).
AnyTimes()
relaySender.
Expand Down Expand Up @@ -286,7 +286,7 @@ func TestConsumerWSSubscriptionManagerParallelSubscriptions(t *testing.T) {

relaySender.
EXPECT().
SendParsedRelay(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).
SendParsedRelay(gomock.Any(), gomock.Any(), gomock.Any()).
Return(relayResult1, nil).
Times(1) // Should call SendParsedRelay, because it is the first time we subscribe

Expand Down Expand Up @@ -430,14 +430,15 @@ func TestConsumerWSSubscriptionManager(t *testing.T) {
subscribeChainMessage1, err := chainParser.ParseMsg("", play.subscriptionRequestData1, play.connectionType, nil, extensionslib.ExtensionInfo{LatestBlock: 0})
require.NoError(t, err)

subscribeProtocolMessage1 := NewProtocolMessage(subscribeChainMessage1, nil, nil)
subscribeProtocolMessage1 := NewProtocolMessage(subscribeChainMessage1, nil, nil, dapp1, ts.Consumer.Addr.String())
unsubscribeProtocolMessage1 := NewProtocolMessage(unsubscribeChainMessage1, nil, &pairingtypes.RelayPrivateData{
Data: play.unsubscribeMessage1,
})
}, dapp1, ts.Consumer.Addr.String(),
)
relaySender := NewMockRelaySender(ctrl)
relaySender.
EXPECT().
SendParsedRelay(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomockuber.Cond(func(x any) bool {
SendParsedRelay(gomock.Any(), gomock.Any(), gomockuber.Cond(func(x any) bool {
protocolMsg, ok := x.(ProtocolMessage)
require.True(t, ok)
require.NotNil(t, protocolMsg)
Expand All @@ -455,9 +456,9 @@ func TestConsumerWSSubscriptionManager(t *testing.T) {

relaySender.
EXPECT().
CreateDappKey(gomock.Any(), gomock.Any()).
DoAndReturn(func(dappID, consumerIp string) string {
return dappID + consumerIp
CreateDappKey(gomock.Any()).
DoAndReturn(func(userData common.UserData) string {
return userData.DappId + userData.ConsumerIp
}).
AnyTimes()

Expand Down Expand Up @@ -530,7 +531,7 @@ func TestConsumerWSSubscriptionManager(t *testing.T) {

relaySender.
EXPECT().
SendParsedRelay(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).
SendParsedRelay(gomock.Any(), gomock.Any(), gomock.Any()).
Return(relayResult1, nil).
Times(1) // Should call SendParsedRelay, because it is the first time we subscribe

Expand All @@ -552,7 +553,7 @@ func TestConsumerWSSubscriptionManager(t *testing.T) {

relaySender.
EXPECT().
SendParsedRelay(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).
SendParsedRelay(gomock.Any(), gomock.Any(), gomock.Any()).
Return(relayResult1, nil).
Times(0) // Should not call SendParsedRelay, because it is already subscribed

Expand All @@ -578,10 +579,10 @@ func TestConsumerWSSubscriptionManager(t *testing.T) {
// Prepare for the next subscription
unsubscribeChainMessage2, err := chainParser.ParseMsg("", play.unsubscribeMessage2, play.connectionType, nil, extensionslib.ExtensionInfo{LatestBlock: 0})
require.NoError(t, err)
unsubscribeProtocolMessage2 := NewProtocolMessage(unsubscribeChainMessage2, nil, &pairingtypes.RelayPrivateData{Data: play.unsubscribeMessage2})
unsubscribeProtocolMessage2 := NewProtocolMessage(unsubscribeChainMessage2, nil, &pairingtypes.RelayPrivateData{Data: play.unsubscribeMessage2}, dapp2, ts.Consumer.Addr.String())
subscribeChainMessage2, err := chainParser.ParseMsg("", play.subscriptionRequestData2, play.connectionType, nil, extensionslib.ExtensionInfo{LatestBlock: 0})
require.NoError(t, err)
subscribeProtocolMessage2 := NewProtocolMessage(subscribeChainMessage2, nil, nil)
subscribeProtocolMessage2 := NewProtocolMessage(subscribeChainMessage2, nil, nil, dapp2, ts.Consumer.Addr.String())
relaySender.
EXPECT().
ParseRelay(gomock.Any(), gomock.Any(), gomockuber.Cond(func(x any) bool {
Expand Down Expand Up @@ -644,7 +645,7 @@ func TestConsumerWSSubscriptionManager(t *testing.T) {

relaySender.
EXPECT().
SendParsedRelay(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).
SendParsedRelay(gomock.Any(), gomock.Any(), gomock.Any()).
Return(relayResult2, nil).
Times(1) // Should call SendParsedRelay, because it is the first time we subscribe

Expand All @@ -664,12 +665,12 @@ func TestConsumerWSSubscriptionManager(t *testing.T) {
// Prepare for unsubscribe from the first subscription
relaySender.
EXPECT().
SendParsedRelay(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).
SendParsedRelay(gomock.Any(), gomock.Any(), gomock.Any()).
Return(relayResult1, nil).
Times(0) // Should call SendParsedRelay, because it unsubscribed

ctx = utils.WithUniqueIdentifier(ctx, utils.GenerateUniqueIdentifier())
unsubProtocolMessage := NewProtocolMessage(unsubscribeChainMessage1, nil, relayResult1.Request.RelayData)
unsubProtocolMessage := NewProtocolMessage(unsubscribeChainMessage1, nil, relayResult1.Request.RelayData, dapp2, ts.Consumer.Addr.String())
err = manager.Unsubscribe(ctx, unsubProtocolMessage, dapp2, ts.Consumer.Addr.String(), uniqueId, nil)
require.NoError(t, err)

Expand All @@ -688,8 +689,8 @@ func TestConsumerWSSubscriptionManager(t *testing.T) {
// Prepare for unsubscribe from the second subscription
relaySender.
EXPECT().
SendParsedRelay(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).
DoAndReturn(func(ctx context.Context, dappID string, consumerIp string, analytics *metrics.RelayMetrics, protocolMessage ProtocolMessage) (relayResult *common.RelayResult, errRet error) {
SendParsedRelay(gomock.Any(), gomock.Any(), gomock.Any()).
DoAndReturn(func(ctx context.Context, analytics *metrics.RelayMetrics, protocolMessage ProtocolMessage) (relayResult *common.RelayResult, errRet error) {
wg.Done()
return relayResult2, nil
}).
Expand Down
9 changes: 8 additions & 1 deletion protocol/chainlib/protocol_message.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,11 @@ type BaseProtocolMessage struct {
ChainMessage
directiveHeaders map[string]string
relayRequestData *pairingtypes.RelayPrivateData
userData common.UserData
}

func (bpm *BaseProtocolMessage) GetUserData() common.UserData {
return bpm.userData
}

func (bpm *BaseProtocolMessage) GetDirectiveHeaders() map[string]string {
Expand Down Expand Up @@ -39,11 +44,12 @@ func (bpm *BaseProtocolMessage) GetBlockedProviders() []string {
return nil
}

func NewProtocolMessage(chainMessage ChainMessage, directiveHeaders map[string]string, relayRequestData *pairingtypes.RelayPrivateData) ProtocolMessage {
func NewProtocolMessage(chainMessage ChainMessage, directiveHeaders map[string]string, relayRequestData *pairingtypes.RelayPrivateData, dappId, consumerIp string) ProtocolMessage {
return &BaseProtocolMessage{
ChainMessage: chainMessage,
directiveHeaders: directiveHeaders,
relayRequestData: relayRequestData,
userData: common.UserData{DappId: dappId, ConsumerIp: consumerIp},
}
}

Expand All @@ -53,4 +59,5 @@ type ProtocolMessage interface {
RelayPrivateData() *pairingtypes.RelayPrivateData
HashCacheRequest(chainId string) ([]byte, func([]byte) []byte, error)
GetBlockedProviders() []string
GetUserData() common.UserData
}
9 changes: 7 additions & 2 deletions protocol/common/endpoints.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,11 @@ var SPECIAL_LAVA_DIRECTIVE_HEADERS = map[string]struct{}{
LAVA_DEBUG_RELAY: {},
}

type UserData struct {
ConsumerIp string
DappId string
}

type NodeUrl struct {
Url string `yaml:"url,omitempty" json:"url,omitempty" mapstructure:"url"`
InternalPath string `yaml:"internal-path,omitempty" json:"internal-path,omitempty" mapstructure:"internal-path"`
Expand Down Expand Up @@ -314,7 +319,7 @@ func GetTokenFromGrpcContext(ctx context.Context) string {
return ""
}

func GetUniqueToken(consumerAddress string, ip string) string {
data := []byte(consumerAddress + ip)
func GetUniqueToken(userData UserData) string {
data := []byte(userData.DappId + userData.ConsumerIp)
return base64.StdEncoding.EncodeToString(sigs.HashMsg(data))
}
13 changes: 7 additions & 6 deletions protocol/rpcconsumer/consumer_consistency.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"time"

"github.com/dgraph-io/ristretto"
common "github.com/lavanet/lava/v2/protocol/common"
"github.com/lavanet/lava/v2/utils"
)

Expand Down Expand Up @@ -40,8 +41,8 @@ func (cc *ConsumerConsistency) getLatestBlock(key string) (block int64, found bo
return block, found
}

func (cc *ConsumerConsistency) Key(dappId string, ip string) string {
return dappId + "__" + ip
func (cc *ConsumerConsistency) Key(userData common.UserData) string {
return userData.DappId + "__" + userData.ConsumerIp
}

// used on subscription, where we already have the dapp key stored, but we don't keep the dappId and ip separately
Expand All @@ -55,19 +56,19 @@ func (cc *ConsumerConsistency) SetSeenBlockFromKey(blockSeen int64, key string)
}
}

func (cc *ConsumerConsistency) SetSeenBlock(blockSeen int64, dappId string, ip string) {
func (cc *ConsumerConsistency) SetSeenBlock(blockSeen int64, userData common.UserData) {
if cc == nil {
return
}
key := cc.Key(dappId, ip)
key := cc.Key(userData)
cc.SetSeenBlockFromKey(blockSeen, key)
}

func (cc *ConsumerConsistency) GetSeenBlock(dappId string, ip string) (int64, bool) {
func (cc *ConsumerConsistency) GetSeenBlock(userData common.UserData) (int64, bool) {
if cc == nil {
return 0, false
}
return cc.getLatestBlock(cc.Key(dappId, ip))
return cc.getLatestBlock(cc.Key(userData))
}

func NewConsumerConsistency(specId string) *ConsumerConsistency {
Expand Down
Loading

0 comments on commit 0d1806d

Please sign in to comment.