From 0d1806dd2e2155d08eecb28ccbf7a8f50ce4f6b8 Mon Sep 17 00:00:00 2001 From: Ran Mishael <106548467+ranlavanet@users.noreply.github.com> Date: Mon, 26 Aug 2024 17:58:05 +0200 Subject: [PATCH] feat: PRT - dappid and consumer ip added to protocol message. (#1643) * feat: PRT - dappid and consumer ip added to protocol message. * fix test * fixing comments * fix test * adding comment for wrapping protocol message. * add user data everywhere * fix me * fix lint * fix all mocked tests --------- Co-authored-by: Omer <100387053+omerlavanet@users.noreply.github.com> --- protocol/chainlib/chainlib.go | 4 +- protocol/chainlib/chainlib_mock.go | 16 ++--- .../chainlib/consumer_websocket_manager.go | 2 +- .../consumer_ws_subscription_manager.go | 17 ++--- .../consumer_ws_subscription_manager_test.go | 51 ++++++++------- protocol/chainlib/protocol_message.go | 9 ++- protocol/common/endpoints.go | 9 ++- protocol/rpcconsumer/consumer_consistency.go | 13 ++-- .../rpcconsumer/consumer_consistency_test.go | 16 +++-- protocol/rpcconsumer/relay_processor.go | 33 ++++------ protocol/rpcconsumer/relay_processor_test.go | 54 +++++++++++---- protocol/rpcconsumer/rpcconsumer_server.go | 65 ++++++++++--------- protocol/rpcprovider/rpcprovider_server.go | 4 +- 13 files changed, 168 insertions(+), 125 deletions(-) diff --git a/protocol/chainlib/chainlib.go b/protocol/chainlib/chainlib.go index 8ed037669d..cbd5881b52 100644 --- a/protocol/chainlib/chainlib.go +++ b/protocol/chainlib/chainlib.go @@ -128,12 +128,10 @@ type RelaySender interface { ) (ProtocolMessage, error) SendParsedRelay( ctx context.Context, - dappID string, - consumerIp string, analytics *metrics.RelayMetrics, protocolMessage ProtocolMessage, ) (relayResult *common.RelayResult, errRet error) - CreateDappKey(dappID, consumerIp string) string + CreateDappKey(userData common.UserData) string CancelSubscriptionContext(subscriptionKey string) SetConsistencySeenBlock(blockSeen int64, key string) } diff --git a/protocol/chainlib/chainlib_mock.go b/protocol/chainlib/chainlib_mock.go index 757c2cd9e0..a8951760ae 100644 --- a/protocol/chainlib/chainlib_mock.go +++ b/protocol/chainlib/chainlib_mock.go @@ -692,17 +692,17 @@ func (mr *MockRelaySenderMockRecorder) CancelSubscriptionContext(subscriptionKey } // CreateDappKey mocks base method. -func (m *MockRelaySender) CreateDappKey(dappID, consumerIp string) string { +func (m *MockRelaySender) CreateDappKey(userData common.UserData) string { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "CreateDappKey", dappID, consumerIp) + ret := m.ctrl.Call(m, "CreateDappKey", userData) ret0, _ := ret[0].(string) return ret0 } // CreateDappKey indicates an expected call of CreateDappKey. -func (mr *MockRelaySenderMockRecorder) CreateDappKey(dappID, consumerIp interface{}) *gomock.Call { +func (mr *MockRelaySenderMockRecorder) CreateDappKey(userData interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CreateDappKey", reflect.TypeOf((*MockRelaySender)(nil).CreateDappKey), dappID, consumerIp) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CreateDappKey", reflect.TypeOf((*MockRelaySender)(nil).CreateDappKey), userData) } // ParseRelay mocks base method. @@ -721,18 +721,18 @@ func (mr *MockRelaySenderMockRecorder) ParseRelay(ctx, url, req, connectionType, } // SendParsedRelay mocks base method. -func (m *MockRelaySender) SendParsedRelay(ctx context.Context, dappID, consumerIp string, analytics *metrics.RelayMetrics, protocolMessage ProtocolMessage) (*common.RelayResult, error) { +func (m *MockRelaySender) SendParsedRelay(ctx context.Context, analytics *metrics.RelayMetrics, protocolMessage ProtocolMessage) (*common.RelayResult, error) { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "SendParsedRelay", ctx, dappID, consumerIp, analytics, protocolMessage) + ret := m.ctrl.Call(m, "SendParsedRelay", ctx, analytics, protocolMessage) ret0, _ := ret[0].(*common.RelayResult) ret1, _ := ret[1].(error) return ret0, ret1 } // SendParsedRelay indicates an expected call of SendParsedRelay. -func (mr *MockRelaySenderMockRecorder) SendParsedRelay(ctx, dappID, consumerIp, analytics, protocolMessage interface{}) *gomock.Call { +func (mr *MockRelaySenderMockRecorder) SendParsedRelay(ctx, analytics, protocolMessage interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SendParsedRelay", reflect.TypeOf((*MockRelaySender)(nil).SendParsedRelay), ctx, dappID, consumerIp, analytics, protocolMessage) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SendParsedRelay", reflect.TypeOf((*MockRelaySender)(nil).SendParsedRelay), ctx, analytics, protocolMessage) } // SendRelay mocks base method. diff --git a/protocol/chainlib/consumer_websocket_manager.go b/protocol/chainlib/consumer_websocket_manager.go index 75dd3ca1ba..aefb3878bc 100644 --- a/protocol/chainlib/consumer_websocket_manager.go +++ b/protocol/chainlib/consumer_websocket_manager.go @@ -182,7 +182,7 @@ func (cwm *ConsumerWebsocketManager) ListenToMessages() { continue } else { // Normal relay over websocket. (not subscription related) - relayResult, err := cwm.relaySender.SendParsedRelay(webSocketCtx, dappID, userIp, metricsData, protocolMessage) + relayResult, err := cwm.relaySender.SendParsedRelay(webSocketCtx, metricsData, protocolMessage) if err != nil { formatterMsg := logger.AnalyzeWebSocketErrorAndGetFormattedMessage(websocketConn.LocalAddr().String(), utils.LavaFormatError("could not send parsed relay", err), msgSeed, msg, cwm.apiInterface, time.Since(startTime)) if formatterMsg != nil { diff --git a/protocol/chainlib/consumer_ws_subscription_manager.go b/protocol/chainlib/consumer_ws_subscription_manager.go index dda1405573..17ed28f913 100644 --- a/protocol/chainlib/consumer_ws_subscription_manager.go +++ b/protocol/chainlib/consumer_ws_subscription_manager.go @@ -301,7 +301,7 @@ func (cwsm *ConsumerWSSubscriptionManager) StartSubscription( utils.LogAttr("dappKey", dappKey), ) - relayResult, err := cwsm.relaySender.SendParsedRelay(webSocketCtx, dappID, consumerIp, metricsData, protocolMessage) + relayResult, err := cwsm.relaySender.SendParsedRelay(webSocketCtx, metricsData, protocolMessage) if err != nil { onSubscriptionFailure() return nil, nil, utils.LavaFormatError("could not send subscription relay", err) @@ -490,7 +490,7 @@ func (cwsm *ConsumerWSSubscriptionManager) listenForSubscriptionMessages( } unsubscribeRelayCtx := utils.WithUniqueIdentifier(context.Background(), utils.GenerateUniqueIdentifier()) - err = cwsm.sendUnsubscribeMessage(unsubscribeRelayCtx, dappID, userIp, protocolMessage, metricsData) + err = cwsm.sendUnsubscribeMessage(unsubscribeRelayCtx, protocolMessage, metricsData) if err != nil { utils.LavaFormatError("could not send unsubscribe message due to a relay error", err, @@ -702,15 +702,16 @@ func (cwsm *ConsumerWSSubscriptionManager) craftUnsubscribeMessage(hashedParams, return protocolMessage, nil } -func (cwsm *ConsumerWSSubscriptionManager) sendUnsubscribeMessage(ctx context.Context, dappID, consumerIp string, protocolMessage ProtocolMessage, metricsData *metrics.RelayMetrics) error { +func (cwsm *ConsumerWSSubscriptionManager) sendUnsubscribeMessage(ctx context.Context, protocolMessage ProtocolMessage, metricsData *metrics.RelayMetrics) error { // Send the crafted unsubscribe relay + userData := protocolMessage.GetUserData() utils.LavaFormatTrace("sending unsubscribe relay", utils.LogAttr("GUID", ctx), - utils.LogAttr("dappID", dappID), - utils.LogAttr("consumerIp", consumerIp), + utils.LogAttr("dappID", userData.DappId), + utils.LogAttr("consumerIp", userData.ConsumerIp), ) - _, err := cwsm.relaySender.SendParsedRelay(ctx, dappID, consumerIp, metricsData, protocolMessage) + _, err := cwsm.relaySender.SendParsedRelay(ctx, metricsData, protocolMessage) if err != nil { return utils.LavaFormatError("could not send unsubscribe relay", err) } @@ -733,8 +734,8 @@ func (cwsm *ConsumerWSSubscriptionManager) connectDappWithSubscription(dappKey s cwsm.connectedDapps[dappKey][hashedParams] = webSocketChan } -func (cwsm *ConsumerWSSubscriptionManager) CreateWebSocketConnectionUniqueKey(dappID, consumerIp, webSocketConnectionUniqueId string) string { - return cwsm.relaySender.CreateDappKey(dappID, consumerIp) + "__" + webSocketConnectionUniqueId +func (cwsm *ConsumerWSSubscriptionManager) CreateWebSocketConnectionUniqueKey(dappID, consumerIp string, webSocketConnectionUniqueId string) string { + return cwsm.relaySender.CreateDappKey(common.UserData{DappId: dappID, ConsumerIp: consumerIp}) + "__" + webSocketConnectionUniqueId } func (cwsm *ConsumerWSSubscriptionManager) UnsubscribeAll(webSocketCtx context.Context, dappID, consumerIp string, webSocketConnectionUniqueId string, metricsData *metrics.RelayMetrics) error { diff --git a/protocol/chainlib/consumer_ws_subscription_manager_test.go b/protocol/chainlib/consumer_ws_subscription_manager_test.go index 81a59fea87..10d718cc59 100644 --- a/protocol/chainlib/consumer_ws_subscription_manager_test.go +++ b/protocol/chainlib/consumer_ws_subscription_manager_test.go @@ -67,13 +67,13 @@ func TestConsumerWSSubscriptionManagerParallelSubscriptionsOnSameDappIdIp(t *tes chainMessage1, err := chainParser.ParseMsg("", play.subscriptionRequestData1, play.connectionType, nil, extensionslib.ExtensionInfo{LatestBlock: 0}) require.NoError(t, err) - protocolMessage1 := NewProtocolMessage(chainMessage1, nil, nil) + protocolMessage1 := NewProtocolMessage(chainMessage1, nil, nil, dapp, ip) relaySender := NewMockRelaySender(ctrl) relaySender. EXPECT(). - CreateDappKey(gomock.Any(), gomock.Any()). - DoAndReturn(func(dappID, consumerIp string) string { - return dappID + consumerIp + CreateDappKey(gomock.Any()). + DoAndReturn(func(userData common.UserData) string { + return userData.DappId + userData.ConsumerIp }). AnyTimes() relaySender. @@ -129,7 +129,7 @@ func TestConsumerWSSubscriptionManagerParallelSubscriptionsOnSameDappIdIp(t *tes relaySender. EXPECT(). - SendParsedRelay(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()). + SendParsedRelay(gomock.Any(), gomock.Any(), gomock.Any()). Return(relayResult1, nil). Times(1) // Should call SendParsedRelay, because it is the first time we subscribe @@ -224,13 +224,13 @@ func TestConsumerWSSubscriptionManagerParallelSubscriptions(t *testing.T) { chainMessage1, err := chainParser.ParseMsg("", play.subscriptionRequestData1, play.connectionType, nil, extensionslib.ExtensionInfo{LatestBlock: 0}) require.NoError(t, err) - protocolMessage1 := NewProtocolMessage(chainMessage1, nil, nil) + protocolMessage1 := NewProtocolMessage(chainMessage1, nil, nil, dapp, "") relaySender := NewMockRelaySender(ctrl) relaySender. EXPECT(). - CreateDappKey(gomock.Any(), gomock.Any()). - DoAndReturn(func(dappID, consumerIp string) string { - return dappID + consumerIp + CreateDappKey(gomock.Any()). + DoAndReturn(func(userData common.UserData) string { + return userData.DappId + userData.ConsumerIp }). AnyTimes() relaySender. @@ -286,7 +286,7 @@ func TestConsumerWSSubscriptionManagerParallelSubscriptions(t *testing.T) { relaySender. EXPECT(). - SendParsedRelay(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()). + SendParsedRelay(gomock.Any(), gomock.Any(), gomock.Any()). Return(relayResult1, nil). Times(1) // Should call SendParsedRelay, because it is the first time we subscribe @@ -430,14 +430,15 @@ func TestConsumerWSSubscriptionManager(t *testing.T) { subscribeChainMessage1, err := chainParser.ParseMsg("", play.subscriptionRequestData1, play.connectionType, nil, extensionslib.ExtensionInfo{LatestBlock: 0}) require.NoError(t, err) - subscribeProtocolMessage1 := NewProtocolMessage(subscribeChainMessage1, nil, nil) + subscribeProtocolMessage1 := NewProtocolMessage(subscribeChainMessage1, nil, nil, dapp1, ts.Consumer.Addr.String()) unsubscribeProtocolMessage1 := NewProtocolMessage(unsubscribeChainMessage1, nil, &pairingtypes.RelayPrivateData{ Data: play.unsubscribeMessage1, - }) + }, dapp1, ts.Consumer.Addr.String(), + ) relaySender := NewMockRelaySender(ctrl) relaySender. EXPECT(). - SendParsedRelay(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomockuber.Cond(func(x any) bool { + SendParsedRelay(gomock.Any(), gomock.Any(), gomockuber.Cond(func(x any) bool { protocolMsg, ok := x.(ProtocolMessage) require.True(t, ok) require.NotNil(t, protocolMsg) @@ -455,9 +456,9 @@ func TestConsumerWSSubscriptionManager(t *testing.T) { relaySender. EXPECT(). - CreateDappKey(gomock.Any(), gomock.Any()). - DoAndReturn(func(dappID, consumerIp string) string { - return dappID + consumerIp + CreateDappKey(gomock.Any()). + DoAndReturn(func(userData common.UserData) string { + return userData.DappId + userData.ConsumerIp }). AnyTimes() @@ -530,7 +531,7 @@ func TestConsumerWSSubscriptionManager(t *testing.T) { relaySender. EXPECT(). - SendParsedRelay(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()). + SendParsedRelay(gomock.Any(), gomock.Any(), gomock.Any()). Return(relayResult1, nil). Times(1) // Should call SendParsedRelay, because it is the first time we subscribe @@ -552,7 +553,7 @@ func TestConsumerWSSubscriptionManager(t *testing.T) { relaySender. EXPECT(). - SendParsedRelay(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()). + SendParsedRelay(gomock.Any(), gomock.Any(), gomock.Any()). Return(relayResult1, nil). Times(0) // Should not call SendParsedRelay, because it is already subscribed @@ -578,10 +579,10 @@ func TestConsumerWSSubscriptionManager(t *testing.T) { // Prepare for the next subscription unsubscribeChainMessage2, err := chainParser.ParseMsg("", play.unsubscribeMessage2, play.connectionType, nil, extensionslib.ExtensionInfo{LatestBlock: 0}) require.NoError(t, err) - unsubscribeProtocolMessage2 := NewProtocolMessage(unsubscribeChainMessage2, nil, &pairingtypes.RelayPrivateData{Data: play.unsubscribeMessage2}) + unsubscribeProtocolMessage2 := NewProtocolMessage(unsubscribeChainMessage2, nil, &pairingtypes.RelayPrivateData{Data: play.unsubscribeMessage2}, dapp2, ts.Consumer.Addr.String()) subscribeChainMessage2, err := chainParser.ParseMsg("", play.subscriptionRequestData2, play.connectionType, nil, extensionslib.ExtensionInfo{LatestBlock: 0}) require.NoError(t, err) - subscribeProtocolMessage2 := NewProtocolMessage(subscribeChainMessage2, nil, nil) + subscribeProtocolMessage2 := NewProtocolMessage(subscribeChainMessage2, nil, nil, dapp2, ts.Consumer.Addr.String()) relaySender. EXPECT(). ParseRelay(gomock.Any(), gomock.Any(), gomockuber.Cond(func(x any) bool { @@ -644,7 +645,7 @@ func TestConsumerWSSubscriptionManager(t *testing.T) { relaySender. EXPECT(). - SendParsedRelay(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()). + SendParsedRelay(gomock.Any(), gomock.Any(), gomock.Any()). Return(relayResult2, nil). Times(1) // Should call SendParsedRelay, because it is the first time we subscribe @@ -664,12 +665,12 @@ func TestConsumerWSSubscriptionManager(t *testing.T) { // Prepare for unsubscribe from the first subscription relaySender. EXPECT(). - SendParsedRelay(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()). + SendParsedRelay(gomock.Any(), gomock.Any(), gomock.Any()). Return(relayResult1, nil). Times(0) // Should call SendParsedRelay, because it unsubscribed ctx = utils.WithUniqueIdentifier(ctx, utils.GenerateUniqueIdentifier()) - unsubProtocolMessage := NewProtocolMessage(unsubscribeChainMessage1, nil, relayResult1.Request.RelayData) + unsubProtocolMessage := NewProtocolMessage(unsubscribeChainMessage1, nil, relayResult1.Request.RelayData, dapp2, ts.Consumer.Addr.String()) err = manager.Unsubscribe(ctx, unsubProtocolMessage, dapp2, ts.Consumer.Addr.String(), uniqueId, nil) require.NoError(t, err) @@ -688,8 +689,8 @@ func TestConsumerWSSubscriptionManager(t *testing.T) { // Prepare for unsubscribe from the second subscription relaySender. EXPECT(). - SendParsedRelay(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()). - DoAndReturn(func(ctx context.Context, dappID string, consumerIp string, analytics *metrics.RelayMetrics, protocolMessage ProtocolMessage) (relayResult *common.RelayResult, errRet error) { + SendParsedRelay(gomock.Any(), gomock.Any(), gomock.Any()). + DoAndReturn(func(ctx context.Context, analytics *metrics.RelayMetrics, protocolMessage ProtocolMessage) (relayResult *common.RelayResult, errRet error) { wg.Done() return relayResult2, nil }). diff --git a/protocol/chainlib/protocol_message.go b/protocol/chainlib/protocol_message.go index 9a3313e07e..0f6777a04f 100644 --- a/protocol/chainlib/protocol_message.go +++ b/protocol/chainlib/protocol_message.go @@ -11,6 +11,11 @@ type BaseProtocolMessage struct { ChainMessage directiveHeaders map[string]string relayRequestData *pairingtypes.RelayPrivateData + userData common.UserData +} + +func (bpm *BaseProtocolMessage) GetUserData() common.UserData { + return bpm.userData } func (bpm *BaseProtocolMessage) GetDirectiveHeaders() map[string]string { @@ -39,11 +44,12 @@ func (bpm *BaseProtocolMessage) GetBlockedProviders() []string { return nil } -func NewProtocolMessage(chainMessage ChainMessage, directiveHeaders map[string]string, relayRequestData *pairingtypes.RelayPrivateData) ProtocolMessage { +func NewProtocolMessage(chainMessage ChainMessage, directiveHeaders map[string]string, relayRequestData *pairingtypes.RelayPrivateData, dappId, consumerIp string) ProtocolMessage { return &BaseProtocolMessage{ ChainMessage: chainMessage, directiveHeaders: directiveHeaders, relayRequestData: relayRequestData, + userData: common.UserData{DappId: dappId, ConsumerIp: consumerIp}, } } @@ -53,4 +59,5 @@ type ProtocolMessage interface { RelayPrivateData() *pairingtypes.RelayPrivateData HashCacheRequest(chainId string) ([]byte, func([]byte) []byte, error) GetBlockedProviders() []string + GetUserData() common.UserData } diff --git a/protocol/common/endpoints.go b/protocol/common/endpoints.go index 33de581e75..a5dce43136 100644 --- a/protocol/common/endpoints.go +++ b/protocol/common/endpoints.go @@ -49,6 +49,11 @@ var SPECIAL_LAVA_DIRECTIVE_HEADERS = map[string]struct{}{ LAVA_DEBUG_RELAY: {}, } +type UserData struct { + ConsumerIp string + DappId string +} + type NodeUrl struct { Url string `yaml:"url,omitempty" json:"url,omitempty" mapstructure:"url"` InternalPath string `yaml:"internal-path,omitempty" json:"internal-path,omitempty" mapstructure:"internal-path"` @@ -314,7 +319,7 @@ func GetTokenFromGrpcContext(ctx context.Context) string { return "" } -func GetUniqueToken(consumerAddress string, ip string) string { - data := []byte(consumerAddress + ip) +func GetUniqueToken(userData UserData) string { + data := []byte(userData.DappId + userData.ConsumerIp) return base64.StdEncoding.EncodeToString(sigs.HashMsg(data)) } diff --git a/protocol/rpcconsumer/consumer_consistency.go b/protocol/rpcconsumer/consumer_consistency.go index dc54667e44..30c2decfd3 100644 --- a/protocol/rpcconsumer/consumer_consistency.go +++ b/protocol/rpcconsumer/consumer_consistency.go @@ -4,6 +4,7 @@ import ( "time" "github.com/dgraph-io/ristretto" + common "github.com/lavanet/lava/v2/protocol/common" "github.com/lavanet/lava/v2/utils" ) @@ -40,8 +41,8 @@ func (cc *ConsumerConsistency) getLatestBlock(key string) (block int64, found bo return block, found } -func (cc *ConsumerConsistency) Key(dappId string, ip string) string { - return dappId + "__" + ip +func (cc *ConsumerConsistency) Key(userData common.UserData) string { + return userData.DappId + "__" + userData.ConsumerIp } // used on subscription, where we already have the dapp key stored, but we don't keep the dappId and ip separately @@ -55,19 +56,19 @@ func (cc *ConsumerConsistency) SetSeenBlockFromKey(blockSeen int64, key string) } } -func (cc *ConsumerConsistency) SetSeenBlock(blockSeen int64, dappId string, ip string) { +func (cc *ConsumerConsistency) SetSeenBlock(blockSeen int64, userData common.UserData) { if cc == nil { return } - key := cc.Key(dappId, ip) + key := cc.Key(userData) cc.SetSeenBlockFromKey(blockSeen, key) } -func (cc *ConsumerConsistency) GetSeenBlock(dappId string, ip string) (int64, bool) { +func (cc *ConsumerConsistency) GetSeenBlock(userData common.UserData) (int64, bool) { if cc == nil { return 0, false } - return cc.getLatestBlock(cc.Key(dappId, ip)) + return cc.getLatestBlock(cc.Key(userData)) } func NewConsumerConsistency(specId string) *ConsumerConsistency { diff --git a/protocol/rpcconsumer/consumer_consistency_test.go b/protocol/rpcconsumer/consumer_consistency_test.go index b58717bcb4..82263e12df 100644 --- a/protocol/rpcconsumer/consumer_consistency_test.go +++ b/protocol/rpcconsumer/consumer_consistency_test.go @@ -5,6 +5,7 @@ import ( "testing" "time" + common "github.com/lavanet/lava/v2/protocol/common" "github.com/stretchr/testify/require" ) @@ -35,21 +36,24 @@ func TestBasic(t *testing.T) { dappid_other := "/77777/" ip_other := "2.1.1.1:443" + userDataOne := common.UserData{DappId: dappid, ConsumerIp: ip} + userDataOther := common.UserData{DappId: dappid_other, ConsumerIp: ip_other} + for i := 1; i < 100; i++ { - consumerConsistency.SetSeenBlock(int64(i), dappid, ip) + consumerConsistency.SetSeenBlock(int64(i), userDataOne) time.Sleep(4 * time.Millisecond) // need to let each set finish } - consumerConsistency.SetSeenBlock(5, dappid_other, ip_other) + consumerConsistency.SetSeenBlock(5, userDataOther) time.Sleep(4 * time.Millisecond) // try to set older values and discard them - consumerConsistency.SetSeenBlock(3, dappid_other, ip_other) + consumerConsistency.SetSeenBlock(3, userDataOther) time.Sleep(4 * time.Millisecond) - consumerConsistency.SetSeenBlock(3, dappid, ip) + consumerConsistency.SetSeenBlock(3, userDataOne) time.Sleep(4 * time.Millisecond) - block, found := consumerConsistency.GetSeenBlock(dappid, ip) + block, found := consumerConsistency.GetSeenBlock(userDataOne) require.True(t, found) require.Equal(t, int64(99), block) - block, found = consumerConsistency.GetSeenBlock(dappid_other, ip_other) + block, found = consumerConsistency.GetSeenBlock(userDataOther) require.True(t, found) require.Equal(t, int64(5), block) } diff --git a/protocol/rpcconsumer/relay_processor.go b/protocol/rpcconsumer/relay_processor.go index f18b276663..02a1600ebe 100644 --- a/protocol/rpcconsumer/relay_processor.go +++ b/protocol/rpcconsumer/relay_processor.go @@ -48,12 +48,10 @@ type RelayProcessor struct { protocolResponseErrors RelayErrors successResults []common.RelayResult lock sync.RWMutex - chainMessage chainlib.ChainMessage + protocolMessage chainlib.ProtocolMessage guid uint64 selection Selection consumerConsistency *ConsumerConsistency - dappID string - consumerIp string skipDataReliability bool debugRelay bool allowSessionDegradation uint32 // used in the scenario where extension was previously used. @@ -67,10 +65,8 @@ func NewRelayProcessor( ctx context.Context, usedProviders *lavasession.UsedProviders, requiredSuccesses int, - chainMessage chainlib.ChainMessage, + protocolMessage chainlib.ProtocolMessage, consumerConsistency *ConsumerConsistency, - dappID string, - consumerIp string, debugRelay bool, metricsInf MetricsInterface, chainIdAndApiInterfaceGetter chainIdAndApiInterfaceGetter, @@ -79,7 +75,7 @@ func NewRelayProcessor( ) *RelayProcessor { guid, _ := utils.GetUniqueIdentifier(ctx) selection := Quorum // select the majority of node responses - if chainlib.GetStateful(chainMessage) == common.CONSISTENCY_SELECT_ALL_PROVIDERS { + if chainlib.GetStateful(protocolMessage) == common.CONSISTENCY_SELECT_ALL_PROVIDERS { selection = BestResult // select the majority of node successes } if requiredSuccesses <= 0 { @@ -91,12 +87,10 @@ func NewRelayProcessor( responses: make(chan *relayResponse, MaxCallsPerRelay), // we set it as buffered so it is not blocking nodeResponseErrors: RelayErrors{relayErrors: []RelayError{}}, protocolResponseErrors: RelayErrors{relayErrors: []RelayError{}, onFailureMergeAll: true}, - chainMessage: chainMessage, + protocolMessage: protocolMessage, guid: guid, selection: selection, consumerConsistency: consumerConsistency, - dappID: dappID, - consumerIp: consumerIp, debugRelay: debugRelay, metricsInf: metricsInf, chainIdAndApiInterfaceGetter: chainIdAndApiInterfaceGetter, @@ -213,7 +207,7 @@ func (rp *RelayProcessor) setValidResponse(response *relayResponse) { // future relay requests and data reliability requests need to ask for the same specific block height to get consensus on the reply // we do not modify the chain message data on the consumer, only it's requested block, so we let the provider know it can't put any block height it wants by setting a specific block height - reqBlock, _ := rp.chainMessage.RequestedBlock() + reqBlock, _ := rp.protocolMessage.RequestedBlock() if reqBlock == spectypes.LATEST_BLOCK { // TODO: when we turn on dataReliability on latest call UpdateLatest, until then we turn it off always // modifiedOnLatestReq := rp.chainMessage.UpdateLatestBlockInMessage(response.relayResult.Reply.LatestBlock, false) @@ -235,15 +229,16 @@ func (rp *RelayProcessor) setValidResponse(response *relayResponse) { // no error, update the seen block blockSeen := response.relayResult.Reply.LatestBlock // nil safe - rp.consumerConsistency.SetSeenBlock(blockSeen, rp.dappID, rp.consumerIp) + userData := rp.protocolMessage.GetUserData() + rp.consumerConsistency.SetSeenBlock(blockSeen, userData) // on subscribe results, we just append to successful results instead of parsing results because we already have a validation. - if chainlib.IsFunctionTagOfType(rp.chainMessage, spectypes.FUNCTION_TAG_SUBSCRIBE) { + if chainlib.IsFunctionTagOfType(rp.protocolMessage, spectypes.FUNCTION_TAG_SUBSCRIBE) { rp.successResults = append(rp.successResults, response.relayResult) return } // check response error - foundError, errorMessage := rp.chainMessage.CheckResponseError(response.relayResult.Reply.Data, response.relayResult.StatusCode) + foundError, errorMessage := rp.protocolMessage.CheckResponseError(response.relayResult.Reply.Data, response.relayResult.StatusCode) if foundError { // this is a node error, meaning we still didn't get a good response. // we may choose to wait until there will be a response or timeout happens @@ -253,7 +248,7 @@ func (rp *RelayProcessor) setValidResponse(response *relayResponse) { // send relay error metrics only on non stateful queries, as stateful queries always return X-1/X errors. if rp.selection != BestResult { go rp.metricsInf.SetRelayNodeErrorMetric(rp.chainIdAndApiInterfaceGetter.GetChainIdAndApiInterface()) - utils.LavaFormatInfo("Relay received a node error", utils.LogAttr("Error", err), utils.LogAttr("provider", response.relayResult.ProviderInfo), utils.LogAttr("Request", rp.chainMessage.GetApi().Name), utils.LogAttr("requested_block", reqBlock)) + utils.LavaFormatInfo("Relay received a node error", utils.LogAttr("Error", err), utils.LogAttr("provider", response.relayResult.ProviderInfo), utils.LogAttr("Request", rp.protocolMessage.GetApi().Name), utils.LogAttr("requested_block", reqBlock)) } return } @@ -305,7 +300,7 @@ func (rp *RelayProcessor) HasResults() bool { } func (rp *RelayProcessor) getInputMsgInfoHashString() (string, error) { - hash, err := rp.chainMessage.GetRawRequestHash() + hash, err := rp.protocolMessage.GetRawRequestHash() hashString := "" if err == nil { hashString = string(hash) @@ -336,8 +331,8 @@ func (rp *RelayProcessor) shouldRetryRelay(resultsCount int, hashErr error, node // We failed enough times. we need to add this to our hash map so we don't waste time on it again. chainId, apiInterface := rp.chainIdAndApiInterfaceGetter.GetChainIdAndApiInterface() utils.LavaFormatWarning("Failed to recover retries on node errors, might be an invalid input", nil, - utils.LogAttr("api", rp.chainMessage.GetApi().Name), - utils.LogAttr("params", rp.chainMessage.GetRPCMessage().GetParams()), + utils.LogAttr("api", rp.protocolMessage.GetApi().Name), + utils.LogAttr("params", rp.protocolMessage.GetRPCMessage().GetParams()), utils.LogAttr("chainId", chainId), utils.LogAttr("apiInterface", apiInterface), utils.LogAttr("hash", hash), @@ -436,7 +431,7 @@ func (rp *RelayProcessor) responsesQuorum(results []common.RelayResult, quorumSi return nil, errors.New("quorumSize must be greater than zero") } countMap := make(map[string]int) // Map to store the count of each unique result.Reply.Data - deterministic := rp.chainMessage.GetApi().Category.Deterministic + deterministic := rp.protocolMessage.GetApi().Category.Deterministic var bestQosResult common.RelayResult bestQos := sdktypes.ZeroDec() nilReplies := 0 diff --git a/protocol/rpcconsumer/relay_processor_test.go b/protocol/rpcconsumer/relay_processor_test.go index bb21a8eda3..7f9936fa85 100644 --- a/protocol/rpcconsumer/relay_processor_test.go +++ b/protocol/rpcconsumer/relay_processor_test.go @@ -44,7 +44,7 @@ func sendSuccessResp(relayProcessor *RelayProcessor, provider string, delay time RelaySession: &pairingtypes.RelaySession{}, RelayData: &pairingtypes.RelayPrivateData{}, }, - Reply: &pairingtypes.RelayReply{Data: []byte("ok")}, + Reply: &pairingtypes.RelayReply{Data: []byte("ok"), LatestBlock: 1}, ProviderInfo: common.ProviderInfo{ProviderAddress: provider}, StatusCode: http.StatusOK, }, @@ -104,7 +104,11 @@ func TestRelayProcessorHappyFlow(t *testing.T) { require.NoError(t, err) chainMsg, err := chainParser.ParseMsg("/cosmos/base/tendermint/v1beta1/blocks/17", nil, http.MethodGet, nil, extensionslib.ExtensionInfo{LatestBlock: 0}) require.NoError(t, err) - relayProcessor := NewRelayProcessor(ctx, lavasession.NewUsedProviders(nil), 1, chainMsg, nil, "", "", false, relayProcessorMetrics, relayProcessorMetrics, false, relayRetriesManagerInstance) + dappId := "dapp" + consumerIp := "123.11" + protocolMessage := chainlib.NewProtocolMessage(chainMsg, nil, nil, dappId, consumerIp) + consistency := NewConsumerConsistency(specId) + relayProcessor := NewRelayProcessor(ctx, lavasession.NewUsedProviders(nil), 1, protocolMessage, consistency, false, relayProcessorMetrics, relayProcessorMetrics, false, relayRetriesManagerInstance) usedProviders := relayProcessor.GetUsedProviders() ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*10) @@ -125,9 +129,22 @@ func TestRelayProcessorHappyFlow(t *testing.T) { require.True(t, resultsOk) protocolErrors := relayProcessor.ProtocolErrors() require.Zero(t, protocolErrors) + returnedResult, err := relayProcessor.ProcessingResult() require.NoError(t, err) require.Equal(t, string(returnedResult.Reply.Data), "ok") + var seenBlock int64 + var found bool + // wait for cache to be added asynchronously + for i := 0; i < 10; i++ { + seenBlock, found = consistency.GetSeenBlock(protocolMessage.GetUserData()) + if found { + break + } + time.Sleep(10 * time.Millisecond) + } + require.True(t, found) + require.Equal(t, seenBlock, int64(1)) }) } @@ -146,7 +163,8 @@ func TestRelayProcessorNodeErrorRetryFlow(t *testing.T) { require.NoError(t, err) chainMsg, err := chainParser.ParseMsg("/cosmos/base/tendermint/v1beta1/blocks/17", nil, http.MethodGet, nil, extensionslib.ExtensionInfo{LatestBlock: 0}) require.NoError(t, err) - relayProcessor := NewRelayProcessor(ctx, lavasession.NewUsedProviders(nil), 1, chainMsg, nil, "", "", false, relayProcessorMetrics, relayProcessorMetrics, false, relayRetriesManagerInstance) + protocolMessage := chainlib.NewProtocolMessage(chainMsg, nil, nil, "", "") + relayProcessor := NewRelayProcessor(ctx, lavasession.NewUsedProviders(nil), 1, protocolMessage, nil, false, relayProcessorMetrics, relayProcessorMetrics, false, relayRetriesManagerInstance) usedProviders := relayProcessor.GetUsedProviders() ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*10) @@ -188,7 +206,8 @@ func TestRelayProcessorNodeErrorRetryFlow(t *testing.T) { // check hash map flow: chainMsg, err = chainParser.ParseMsg("/cosmos/base/tendermint/v1beta1/blocks/17", nil, http.MethodGet, nil, extensionslib.ExtensionInfo{LatestBlock: 0}) require.NoError(t, err) - relayProcessor = NewRelayProcessor(ctx, lavasession.NewUsedProviders(nil), 1, chainMsg, nil, "", "", false, relayProcessorMetrics, relayProcessorMetrics, false, relayRetriesManagerInstance) + protocolMessage = chainlib.NewProtocolMessage(chainMsg, nil, nil, "", "") + relayProcessor = NewRelayProcessor(ctx, lavasession.NewUsedProviders(nil), 1, protocolMessage, nil, false, relayProcessorMetrics, relayProcessorMetrics, false, relayRetriesManagerInstance) usedProviders = relayProcessor.GetUsedProviders() ctx, cancel = context.WithTimeout(context.Background(), time.Millisecond*10) defer cancel() @@ -212,7 +231,8 @@ func TestRelayProcessorNodeErrorRetryFlow(t *testing.T) { // check hash map flow: chainMsg, err = chainParser.ParseMsg("/cosmos/base/tendermint/v1beta1/blocks/18", nil, http.MethodGet, nil, extensionslib.ExtensionInfo{LatestBlock: 0}) require.NoError(t, err) - relayProcessor = NewRelayProcessor(ctx, lavasession.NewUsedProviders(nil), 1, chainMsg, nil, "", "", false, relayProcessorMetrics, relayProcessorMetrics, false, relayRetriesManagerInstance) + protocolMessage = chainlib.NewProtocolMessage(chainMsg, nil, nil, "", "") + relayProcessor = NewRelayProcessor(ctx, lavasession.NewUsedProviders(nil), 1, protocolMessage, nil, false, relayProcessorMetrics, relayProcessorMetrics, false, relayRetriesManagerInstance) usedProviders = relayProcessor.GetUsedProviders() ctx, cancel = context.WithTimeout(context.Background(), time.Millisecond*10) defer cancel() @@ -236,7 +256,8 @@ func TestRelayProcessorNodeErrorRetryFlow(t *testing.T) { // 4th relay, same inputs, this time a successful relay, should remove the hash from the map chainMsg, err = chainParser.ParseMsg("/cosmos/base/tendermint/v1beta1/blocks/17", nil, http.MethodGet, nil, extensionslib.ExtensionInfo{LatestBlock: 0}) require.NoError(t, err) - relayProcessor = NewRelayProcessor(ctx, lavasession.NewUsedProviders(nil), 1, chainMsg, nil, "", "", false, relayProcessorMetrics, relayProcessorMetrics, false, relayRetriesManagerInstance) + protocolMessage = chainlib.NewProtocolMessage(chainMsg, nil, nil, "", "") + relayProcessor = NewRelayProcessor(ctx, lavasession.NewUsedProviders(nil), 1, protocolMessage, nil, false, relayProcessorMetrics, relayProcessorMetrics, false, relayRetriesManagerInstance) usedProviders = relayProcessor.GetUsedProviders() ctx, cancel = context.WithTimeout(context.Background(), time.Millisecond*10) defer cancel() @@ -285,7 +306,8 @@ func TestRelayProcessorNodeErrorRetryFlow(t *testing.T) { require.NoError(t, err) chainMsg, err := chainParser.ParseMsg("/cosmos/base/tendermint/v1beta1/blocks/17", nil, http.MethodGet, nil, extensionslib.ExtensionInfo{LatestBlock: 0}) require.NoError(t, err) - relayProcessor := NewRelayProcessor(ctx, lavasession.NewUsedProviders(nil), 1, chainMsg, nil, "", "", false, relayProcessorMetrics, relayProcessorMetrics, false, relayRetriesManagerInstance) + protocolMessage := chainlib.NewProtocolMessage(chainMsg, nil, nil, "", "") + relayProcessor := NewRelayProcessor(ctx, lavasession.NewUsedProviders(nil), 1, protocolMessage, nil, false, relayProcessorMetrics, relayProcessorMetrics, false, relayRetriesManagerInstance) relayProcessor.disableRelayRetry = true usedProviders := relayProcessor.GetUsedProviders() ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*10) @@ -323,7 +345,8 @@ func TestRelayProcessorTimeout(t *testing.T) { require.NoError(t, err) chainMsg, err := chainParser.ParseMsg("/cosmos/base/tendermint/v1beta1/blocks/17", nil, http.MethodGet, nil, extensionslib.ExtensionInfo{LatestBlock: 0}) require.NoError(t, err) - relayProcessor := NewRelayProcessor(ctx, lavasession.NewUsedProviders(nil), 1, chainMsg, nil, "", "", false, relayProcessorMetrics, relayProcessorMetrics, false, relayRetriesManagerInstance) + protocolMessage := chainlib.NewProtocolMessage(chainMsg, nil, nil, "", "") + relayProcessor := NewRelayProcessor(ctx, lavasession.NewUsedProviders(nil), 1, protocolMessage, nil, false, relayProcessorMetrics, relayProcessorMetrics, false, relayRetriesManagerInstance) usedProviders := relayProcessor.GetUsedProviders() ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*10) @@ -375,7 +398,8 @@ func TestRelayProcessorRetry(t *testing.T) { require.NoError(t, err) chainMsg, err := chainParser.ParseMsg("/cosmos/base/tendermint/v1beta1/blocks/17", nil, http.MethodGet, nil, extensionslib.ExtensionInfo{LatestBlock: 0}) require.NoError(t, err) - relayProcessor := NewRelayProcessor(ctx, lavasession.NewUsedProviders(nil), 1, chainMsg, nil, "", "", false, relayProcessorMetrics, relayProcessorMetrics, false, relayRetriesManagerInstance) + protocolMessage := chainlib.NewProtocolMessage(chainMsg, nil, nil, "", "") + relayProcessor := NewRelayProcessor(ctx, lavasession.NewUsedProviders(nil), 1, protocolMessage, nil, false, relayProcessorMetrics, relayProcessorMetrics, false, relayRetriesManagerInstance) usedProviders := relayProcessor.GetUsedProviders() ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*10) @@ -419,7 +443,8 @@ func TestRelayProcessorRetryNodeError(t *testing.T) { require.NoError(t, err) chainMsg, err := chainParser.ParseMsg("/cosmos/base/tendermint/v1beta1/blocks/17", nil, http.MethodGet, nil, extensionslib.ExtensionInfo{LatestBlock: 0}) require.NoError(t, err) - relayProcessor := NewRelayProcessor(ctx, lavasession.NewUsedProviders(nil), 1, chainMsg, nil, "", "", false, relayProcessorMetrics, relayProcessorMetrics, false, relayRetriesManagerInstance) + protocolMessage := chainlib.NewProtocolMessage(chainMsg, nil, nil, "", "") + relayProcessor := NewRelayProcessor(ctx, lavasession.NewUsedProviders(nil), 1, protocolMessage, nil, false, relayProcessorMetrics, relayProcessorMetrics, false, relayRetriesManagerInstance) usedProviders := relayProcessor.GetUsedProviders() ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*10) @@ -464,7 +489,8 @@ func TestRelayProcessorStatefulApi(t *testing.T) { require.NoError(t, err) chainMsg, err := chainParser.ParseMsg("/cosmos/tx/v1beta1/txs", []byte("data"), http.MethodPost, nil, extensionslib.ExtensionInfo{LatestBlock: 0}) require.NoError(t, err) - relayProcessor := NewRelayProcessor(ctx, lavasession.NewUsedProviders(nil), 1, chainMsg, nil, "", "", false, relayProcessorMetrics, relayProcessorMetrics, false, relayRetriesManagerInstance) + protocolMessage := chainlib.NewProtocolMessage(chainMsg, nil, nil, "", "") + relayProcessor := NewRelayProcessor(ctx, lavasession.NewUsedProviders(nil), 1, protocolMessage, nil, false, relayProcessorMetrics, relayProcessorMetrics, false, relayRetriesManagerInstance) usedProviders := relayProcessor.GetUsedProviders() ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*10) defer cancel() @@ -509,7 +535,8 @@ func TestRelayProcessorStatefulApiErr(t *testing.T) { require.NoError(t, err) chainMsg, err := chainParser.ParseMsg("/cosmos/tx/v1beta1/txs", []byte("data"), http.MethodPost, nil, extensionslib.ExtensionInfo{LatestBlock: 0}) require.NoError(t, err) - relayProcessor := NewRelayProcessor(ctx, lavasession.NewUsedProviders(nil), 1, chainMsg, nil, "", "", false, relayProcessorMetrics, relayProcessorMetrics, false, relayRetriesManagerInstance) + protocolMessage := chainlib.NewProtocolMessage(chainMsg, nil, nil, "", "") + relayProcessor := NewRelayProcessor(ctx, lavasession.NewUsedProviders(nil), 1, protocolMessage, nil, false, relayProcessorMetrics, relayProcessorMetrics, false, relayRetriesManagerInstance) usedProviders := relayProcessor.GetUsedProviders() ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*10) defer cancel() @@ -553,7 +580,8 @@ func TestRelayProcessorLatest(t *testing.T) { require.NoError(t, err) chainMsg, err := chainParser.ParseMsg("/cosmos/base/tendermint/v1beta1/blocks/latest", nil, http.MethodGet, nil, extensionslib.ExtensionInfo{LatestBlock: 0}) require.NoError(t, err) - relayProcessor := NewRelayProcessor(ctx, lavasession.NewUsedProviders(nil), 1, chainMsg, nil, "", "", false, relayProcessorMetrics, relayProcessorMetrics, false, relayRetriesManagerInstance) + protocolMessage := chainlib.NewProtocolMessage(chainMsg, nil, nil, "", "") + relayProcessor := NewRelayProcessor(ctx, lavasession.NewUsedProviders(nil), 1, protocolMessage, nil, false, relayProcessorMetrics, relayProcessorMetrics, false, relayRetriesManagerInstance) usedProviders := relayProcessor.GetUsedProviders() ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*10) defer cancel() diff --git a/protocol/rpcconsumer/rpcconsumer_server.go b/protocol/rpcconsumer/rpcconsumer_server.go index 6824821a41..345e094293 100644 --- a/protocol/rpcconsumer/rpcconsumer_server.go +++ b/protocol/rpcconsumer/rpcconsumer_server.go @@ -43,6 +43,8 @@ const ( MaxRelayRetries = 6 SendRelayAttempts = 3 numberOfTimesToCheckCurrentlyUsedIsEmpty = 3 + initRelaysDappId = "-init-" + initRelaysConsumerIp = "" ) var NoResponseTimeout = sdkerrors.New("NoResponseTimeout Error", 685, "timeout occurred while waiting for providers responses") @@ -227,7 +229,7 @@ func (rpccs *RPCConsumerServer) craftRelay(ctx context.Context) (ok bool, relay func (rpccs *RPCConsumerServer) sendRelayWithRetries(ctx context.Context, retries int, initialRelays bool, protocolMessage chainlib.ProtocolMessage) (bool, error) { success := false var err error - relayProcessor := NewRelayProcessor(ctx, lavasession.NewUsedProviders(nil), 1, protocolMessage, rpccs.consumerConsistency, "-init-", "", rpccs.debugRelays, rpccs.rpcConsumerLogs, rpccs, rpccs.disableNodeErrorRetry, rpccs.relayRetriesManager) + relayProcessor := NewRelayProcessor(ctx, lavasession.NewUsedProviders(nil), 1, protocolMessage, rpccs.consumerConsistency, rpccs.debugRelays, rpccs.rpcConsumerLogs, rpccs, rpccs.disableNodeErrorRetry, rpccs.relayRetriesManager) usedProvidersResets := 1 for i := 0; i < retries; i++ { // Check if we even have enough providers to communicate with them all. @@ -237,11 +239,11 @@ func (rpccs *RPCConsumerServer) sendRelayWithRetries(ctx context.Context, retrie usedProvidersResets++ relayProcessor.GetUsedProviders().ClearUnwanted() } - err = rpccs.sendRelayToProvider(ctx, protocolMessage, "-init-", "", relayProcessor, nil) + err = rpccs.sendRelayToProvider(ctx, protocolMessage, relayProcessor, nil) if lavasession.PairingListEmptyError.Is(err) { // we don't have pairings anymore, could be related to unwanted providers relayProcessor.GetUsedProviders().ClearUnwanted() - err = rpccs.sendRelayToProvider(ctx, protocolMessage, "-init-", "", relayProcessor, nil) + err = rpccs.sendRelayToProvider(ctx, protocolMessage, relayProcessor, nil) } if err != nil { utils.LavaFormatError("[-] failed sending init relay", err, []utils.Attribute{{Key: "chainID", Value: rpccs.listenEndpoint.ChainID}, {Key: "APIInterface", Value: rpccs.listenEndpoint.ApiInterface}, {Key: "relayProcessor", Value: relayProcessor}}...) @@ -294,7 +296,7 @@ func (rpccs *RPCConsumerServer) sendCraftedRelays(retries int, initialRelays boo } return false, err } - protocolMessage := chainlib.NewProtocolMessage(chainMessage, nil, relay) + protocolMessage := chainlib.NewProtocolMessage(chainMessage, nil, relay, initRelaysDappId, initRelaysConsumerIp) return rpccs.sendRelayWithRetries(ctx, retries, initialRelays, protocolMessage) } @@ -322,7 +324,7 @@ func (rpccs *RPCConsumerServer) SendRelay( return nil, err } - return rpccs.SendParsedRelay(ctx, dappID, consumerIp, analytics, protocolMessage) + return rpccs.SendParsedRelay(ctx, analytics, protocolMessage) } func (rpccs *RPCConsumerServer) ParseRelay( @@ -350,20 +352,18 @@ func (rpccs *RPCConsumerServer) ParseRelay( // do this in a loop with retry attempts, configurable via a flag, limited by the number of providers in CSM reqBlock, _ := chainMessage.RequestedBlock() - seenBlock, _ := rpccs.consumerConsistency.GetSeenBlock(dappID, consumerIp) + seenBlock, _ := rpccs.consumerConsistency.GetSeenBlock(common.UserData{DappId: dappID, ConsumerIp: consumerIp}) if seenBlock < 0 { seenBlock = 0 } relayRequestData := lavaprotocol.NewRelayData(ctx, connectionType, url, []byte(req), seenBlock, reqBlock, rpccs.listenEndpoint.ApiInterface, chainMessage.GetRPCMessage().GetHeaders(), chainlib.GetAddon(chainMessage), common.GetExtensionNames(chainMessage.GetExtensions())) - protocolMessage = chainlib.NewProtocolMessage(chainMessage, directiveHeaders, relayRequestData) + protocolMessage = chainlib.NewProtocolMessage(chainMessage, directiveHeaders, relayRequestData, dappID, consumerIp) return protocolMessage, nil } func (rpccs *RPCConsumerServer) SendParsedRelay( ctx context.Context, - dappID string, - consumerIp string, analytics *metrics.RelayMetrics, protocolMessage chainlib.ProtocolMessage, ) (relayResult *common.RelayResult, errRet error) { @@ -373,10 +373,11 @@ func (rpccs *RPCConsumerServer) SendParsedRelay( // asynchronously sends data reliability if necessary relaySentTime := time.Now() - relayProcessor, err := rpccs.ProcessRelaySend(ctx, protocolMessage, dappID, consumerIp, analytics) + relayProcessor, err := rpccs.ProcessRelaySend(ctx, protocolMessage, analytics) if err != nil && !relayProcessor.HasResults() { + userData := protocolMessage.GetUserData() // we can't send anymore, and we don't have any responses - utils.LavaFormatError("failed getting responses from providers", err, utils.Attribute{Key: "GUID", Value: ctx}, utils.LogAttr("endpoint", rpccs.listenEndpoint.Key()), utils.LogAttr("userIp", consumerIp), utils.LogAttr("relayProcessor", relayProcessor)) + utils.LavaFormatError("failed getting responses from providers", err, utils.Attribute{Key: "GUID", Value: ctx}, utils.LogAttr("endpoint", rpccs.listenEndpoint.Key()), utils.LogAttr("userIp", userData.ConsumerIp), utils.LogAttr("relayProcessor", relayProcessor)) return nil, err } @@ -391,7 +392,7 @@ func (rpccs *RPCConsumerServer) SendParsedRelay( if found { dataReliabilityContext = utils.WithUniqueIdentifier(dataReliabilityContext, guid) } - go rpccs.sendDataReliabilityRelayIfApplicable(dataReliabilityContext, dappID, consumerIp, protocolMessage, dataReliabilityThreshold, relayProcessor) // runs asynchronously + go rpccs.sendDataReliabilityRelayIfApplicable(dataReliabilityContext, protocolMessage, dataReliabilityThreshold, relayProcessor) // runs asynchronously } returnedResult, err := relayProcessor.ProcessingResult() @@ -415,11 +416,11 @@ func (rpccs *RPCConsumerServer) GetChainIdAndApiInterface() (string, string) { return rpccs.listenEndpoint.ChainID, rpccs.listenEndpoint.ApiInterface } -func (rpccs *RPCConsumerServer) ProcessRelaySend(ctx context.Context, protocolMessage chainlib.ProtocolMessage, dappID string, consumerIp string, analytics *metrics.RelayMetrics) (*RelayProcessor, error) { +func (rpccs *RPCConsumerServer) ProcessRelaySend(ctx context.Context, protocolMessage chainlib.ProtocolMessage, analytics *metrics.RelayMetrics) (*RelayProcessor, error) { // make sure all of the child contexts are cancelled when we exit ctx, cancel := context.WithCancel(ctx) defer cancel() - relayProcessor := NewRelayProcessor(ctx, lavasession.NewUsedProviders(protocolMessage), rpccs.requiredResponses, protocolMessage, rpccs.consumerConsistency, dappID, consumerIp, rpccs.debugRelays, rpccs.rpcConsumerLogs, rpccs, rpccs.disableNodeErrorRetry, rpccs.relayRetriesManager) + relayProcessor := NewRelayProcessor(ctx, lavasession.NewUsedProviders(protocolMessage), rpccs.requiredResponses, protocolMessage, rpccs.consumerConsistency, rpccs.debugRelays, rpccs.rpcConsumerLogs, rpccs, rpccs.disableNodeErrorRetry, rpccs.relayRetriesManager) var err error // try sending a relay 3 times. if failed return the error for retryFirstRelayAttempt := 0; retryFirstRelayAttempt < SendRelayAttempts; retryFirstRelayAttempt++ { @@ -427,7 +428,7 @@ func (rpccs *RPCConsumerServer) ProcessRelaySend(ctx context.Context, protocolMe if analytics != nil && retryFirstRelayAttempt > 0 { analytics = nil } - err = rpccs.sendRelayToProvider(ctx, protocolMessage, dappID, consumerIp, relayProcessor, analytics) + err = rpccs.sendRelayToProvider(ctx, protocolMessage, relayProcessor, analytics) // check if we had an error. if we did, try again. if err == nil { @@ -494,7 +495,7 @@ func (rpccs *RPCConsumerServer) ProcessRelaySend(ctx context.Context, protocolMe return relayProcessor, nil } // otherwise continue sending another relay - err := rpccs.sendRelayToProvider(processingCtx, protocolMessage, dappID, consumerIp, relayProcessor, nil) + err := rpccs.sendRelayToProvider(processingCtx, protocolMessage, relayProcessor, nil) go validateReturnCondition(err) go readResultsFromProcessor() // increase number of retries launched only if we still have pairing available, if we exhausted the list we don't want to break early @@ -507,7 +508,7 @@ func (rpccs *RPCConsumerServer) ProcessRelaySend(ctx context.Context, protocolMe if relayProcessor.ShouldRetry(numberOfRetriesLaunched) { // limit the number of retries called from the new batch ticker flow. // if we pass the limit we just wait for the relays we sent to return. - err := rpccs.sendRelayToProvider(processingCtx, protocolMessage, dappID, consumerIp, relayProcessor, nil) + err := rpccs.sendRelayToProvider(processingCtx, protocolMessage, relayProcessor, nil) go validateReturnCondition(err) // add ticker launch metrics go rpccs.rpcConsumerLogs.SetRelaySentByNewBatchTickerMetric(rpccs.GetChainIdAndApiInterface()) @@ -528,10 +529,11 @@ func (rpccs *RPCConsumerServer) ProcessRelaySend(ctx context.Context, protocolMe return relayProcessor, returnErr case <-processingCtx.Done(): // in case we got a processing timeout we return context deadline exceeded to the user. + userData := protocolMessage.GetUserData() utils.LavaFormatWarning("Relay Got processingCtx timeout", nil, utils.LogAttr("processingTimeout", processingTimeout), - utils.LogAttr("dappId", dappID), - utils.LogAttr("consumerIp", consumerIp), + utils.LogAttr("dappId", userData.DappId), + utils.LogAttr("consumerIp", userData.ConsumerIp), utils.LogAttr("protocolMessage.GetApi().Name", protocolMessage.GetApi().Name), utils.LogAttr("GUID", ctx), utils.LogAttr("relayProcessor", relayProcessor), @@ -541,8 +543,8 @@ func (rpccs *RPCConsumerServer) ProcessRelaySend(ctx context.Context, protocolMe } } -func (rpccs *RPCConsumerServer) CreateDappKey(dappID, consumerIp string) string { - return rpccs.consumerConsistency.Key(dappID, consumerIp) +func (rpccs *RPCConsumerServer) CreateDappKey(userData common.UserData) string { + return rpccs.consumerConsistency.Key(userData) } func (rpccs *RPCConsumerServer) CancelSubscriptionContext(subscriptionKey string) { @@ -562,8 +564,6 @@ func (rpccs *RPCConsumerServer) CancelSubscriptionContext(subscriptionKey string func (rpccs *RPCConsumerServer) sendRelayToProvider( ctx context.Context, protocolMessage chainlib.ProtocolMessage, - dappID string, - consumerIp string, relayProcessor *RelayProcessor, analytics *metrics.RelayMetrics, ) (errRet error) { @@ -578,9 +578,10 @@ func (rpccs *RPCConsumerServer) sendRelayToProvider( // if necessary send detection tx for hashes consensus mismatch // handle QoS updates // in case connection totally fails, update unresponsive providers in ConsumerSessionManager + userData := protocolMessage.GetUserData() var sharedStateId string // defaults to "", if shared state is disabled then no shared state will be used. if rpccs.sharedState { - sharedStateId = rpccs.consumerConsistency.Key(dappID, consumerIp) // use same key as we use for consistency, (for better consistency :-D) + sharedStateId = rpccs.consumerConsistency.Key(userData) // use same key as we use for consistency, (for better consistency :-D) } privKey := rpccs.privKey @@ -621,7 +622,7 @@ func (rpccs *RPCConsumerServer) sendRelayToProvider( utils.LavaFormatDebug("shared state seen block is newer", utils.LogAttr("cache_seen_block", cacheSeenBlock), utils.LogAttr("local_seen_block", protocolMessage.RelayPrivateData().SeenBlock)) protocolMessage.RelayPrivateData().SeenBlock = cacheSeenBlock // setting the fetched seen block from the cache server to our local cache as well. - rpccs.consumerConsistency.SetSeenBlock(cacheSeenBlock, dappID, consumerIp) + rpccs.consumerConsistency.SetSeenBlock(cacheSeenBlock, userData) } // handle cache reply @@ -668,7 +669,7 @@ func (rpccs *RPCConsumerServer) sendRelayToProvider( if err != nil { if lavasession.PairingListEmptyError.Is(err) { if addon != "" { - return utils.LavaFormatError("No Providers For Addon", err, utils.LogAttr("addon", addon), utils.LogAttr("extensions", extensions), utils.LogAttr("userIp", consumerIp)) + return utils.LavaFormatError("No Providers For Addon", err, utils.LogAttr("addon", addon), utils.LogAttr("extensions", extensions), utils.LogAttr("userIp", userData.ConsumerIp)) } else if len(extensions) > 0 && relayProcessor.GetAllowSessionDegradation() { // if we have no providers for that extension, use a regular provider, otherwise return the extension results sessions, err = rpccs.consumerSessionManager.GetSessions(ctx, chainlib.GetComputeUnits(protocolMessage), usedProviders, reqBlock, addon, []*spectypes.Extension{}, chainlib.GetStateful(protocolMessage), virtualEpoch) if err != nil { @@ -787,7 +788,7 @@ func (rpccs *RPCConsumerServer) sendRelayToProvider( } // unique per dappId and ip - consumerToken := common.GetUniqueToken(dappID, consumerIp) + consumerToken := common.GetUniqueToken(userData) processingTimeout, expectedRelayTimeoutForQOS := rpccs.getProcessingTimeout(protocolMessage) deadline, ok := ctx.Deadline() if ok { // we have ctx deadline. we cant go past it. @@ -1200,7 +1201,7 @@ func (rpccs *RPCConsumerServer) getFirstSubscriptionReply(ctx context.Context, h return &reply, nil } -func (rpccs *RPCConsumerServer) sendDataReliabilityRelayIfApplicable(ctx context.Context, dappID string, consumerIp string, chainMessage chainlib.ChainMessage, dataReliabilityThreshold uint32, relayProcessor *RelayProcessor) error { +func (rpccs *RPCConsumerServer) sendDataReliabilityRelayIfApplicable(ctx context.Context, chainMessage chainlib.ProtocolMessage, dataReliabilityThreshold uint32, relayProcessor *RelayProcessor) error { processingTimeout, expectedRelayTimeout := rpccs.getProcessingTimeout(chainMessage) // Wait another relayTimeout duration to maybe get additional relay results if relayProcessor.usedProviders.CurrentlyUsed() > 0 { @@ -1240,10 +1241,12 @@ func (rpccs *RPCConsumerServer) sendDataReliabilityRelayIfApplicable(ctx context } relayResult := results[0] if len(results) < 2 { + userData := chainMessage.GetUserData() relayRequestData := lavaprotocol.NewRelayData(ctx, relayResult.Request.RelayData.ConnectionType, relayResult.Request.RelayData.ApiUrl, relayResult.Request.RelayData.Data, relayResult.Request.RelayData.SeenBlock, reqBlock, relayResult.Request.RelayData.ApiInterface, chainMessage.GetRPCMessage().GetHeaders(), relayResult.Request.RelayData.Addon, relayResult.Request.RelayData.Extensions) - protocolMessage := chainlib.NewProtocolMessage(chainMessage, nil, relayRequestData) - relayProcessorDataReliability := NewRelayProcessor(ctx, relayProcessor.usedProviders, 1, chainMessage, rpccs.consumerConsistency, dappID, consumerIp, rpccs.debugRelays, rpccs.rpcConsumerLogs, rpccs, rpccs.disableNodeErrorRetry, rpccs.relayRetriesManager) - err := rpccs.sendRelayToProvider(ctx, protocolMessage, dappID, consumerIp, relayProcessorDataReliability, nil) + // We create new protocol message from the old one, but with a new instance of relay request data. + protocolMessage := chainlib.NewProtocolMessage(chainMessage, nil, relayRequestData, userData.DappId, userData.ConsumerIp) + relayProcessorDataReliability := NewRelayProcessor(ctx, relayProcessor.usedProviders, 1, chainMessage, rpccs.consumerConsistency, rpccs.debugRelays, rpccs.rpcConsumerLogs, rpccs, rpccs.disableNodeErrorRetry, rpccs.relayRetriesManager) + err := rpccs.sendRelayToProvider(ctx, protocolMessage, relayProcessorDataReliability, nil) if err != nil { return utils.LavaFormatWarning("failed data reliability relay to provider", err, utils.LogAttr("relayProcessorDataReliability", relayProcessorDataReliability)) } diff --git a/protocol/rpcprovider/rpcprovider_server.go b/protocol/rpcprovider/rpcprovider_server.go index 4de897dd9b..f1f239eea4 100644 --- a/protocol/rpcprovider/rpcprovider_server.go +++ b/protocol/rpcprovider/rpcprovider_server.go @@ -874,10 +874,10 @@ func (rpcps *RPCProviderServer) sendRelayMessageToNode(ctx context.Context, requ utils.LavaFormatDebug("sending relay to node", utils.Attribute{Key: "GUID", Value: ctx}, utils.Attribute{Key: "specID", Value: rpcps.rpcProviderEndpoint.ChainID}) } // add stickiness header - chainMsg.AppendHeader([]pairingtypes.Metadata{{Name: RPCProviderStickinessHeaderName, Value: common.GetUniqueToken(consumerAddr.String(), common.GetTokenFromGrpcContext(ctx))}}) + chainMsg.AppendHeader([]pairingtypes.Metadata{{Name: RPCProviderStickinessHeaderName, Value: common.GetUniqueToken(common.UserData{DappId: consumerAddr.String(), ConsumerIp: common.GetTokenFromGrpcContext(ctx)})}}) chainMsg.AppendHeader([]pairingtypes.Metadata{{Name: RPCProviderAddressHeader, Value: rpcps.providerAddress.String()}}) if debugConsistency { - utils.LavaFormatDebug("adding stickiness header", utils.LogAttr("tokenFromContext", common.GetTokenFromGrpcContext(ctx)), utils.LogAttr("unique_token", common.GetUniqueToken(consumerAddr.String(), common.GetIpFromGrpcContext(ctx)))) + utils.LavaFormatDebug("adding stickiness header", utils.LogAttr("tokenFromContext", common.GetTokenFromGrpcContext(ctx)), utils.LogAttr("unique_token", common.GetUniqueToken(common.UserData{DappId: consumerAddr.String(), ConsumerIp: common.GetIpFromGrpcContext(ctx)}))) } replyWrapper, _, _, _, _, err := rpcps.chainRouter.SendNodeMsg(ctx, nil, chainMsg, request.RelayData.Extensions)