diff --git a/protocol/chainlib/base_chain_parser.go b/protocol/chainlib/base_chain_parser.go index 301f210ffc..d25e9f175b 100644 --- a/protocol/chainlib/base_chain_parser.go +++ b/protocol/chainlib/base_chain_parser.go @@ -22,6 +22,7 @@ type PolicyInf interface { } type BaseChainParser struct { + internalPaths map[string]struct{} taggedApis map[spectypes.FUNCTION_TAG]TaggedContainer spec spectypes.Spec rwLock sync.RWMutex @@ -226,11 +227,12 @@ func (bcp *BaseChainParser) GetVerifications(supported []string) (retVerificatio return } -func (bcp *BaseChainParser) Construct(spec spectypes.Spec, taggedApis map[spectypes.FUNCTION_TAG]TaggedContainer, +func (bcp *BaseChainParser) Construct(spec spectypes.Spec, internalPaths map[string]struct{}, taggedApis map[spectypes.FUNCTION_TAG]TaggedContainer, serverApis map[ApiKey]ApiContainer, apiCollections map[CollectionKey]*spectypes.ApiCollection, headers map[ApiKey]*spectypes.Header, verifications map[VerificationKey][]VerificationContainer, extensionParser extensionslib.ExtensionParser, ) { bcp.spec = spec + bcp.internalPaths = internalPaths bcp.serverApis = serverApis bcp.taggedApis = taggedApis bcp.headers = headers @@ -282,7 +284,7 @@ func (bcp *BaseChainParser) extensionParsingInner(addon string, parsedMessageArg } // getSupportedApi fetches service api from spec by name -func (apip *BaseChainParser) getSupportedApi(name, connectionType string) (*ApiContainer, error) { +func (apip *BaseChainParser) getSupportedApi(apiKey ApiKey) (*ApiContainer, error) { // Guard that the GrpcChainParser instance exists if apip == nil { return nil, errors.New("ChainParser not defined") @@ -293,10 +295,7 @@ func (apip *BaseChainParser) getSupportedApi(name, connectionType string) (*ApiC defer apip.rwLock.RUnlock() // Fetch server api by name - apiCont, ok := apip.serverApis[ApiKey{ - Name: name, - ConnectionType: connectionType, - }] + apiCont, ok := apip.serverApis[apiKey] // Return an error if spec does not exist if !ok { @@ -305,12 +304,20 @@ func (apip *BaseChainParser) getSupportedApi(name, connectionType string) (*ApiC // Return an error if api is disabled if !apiCont.api.Enabled { - return nil, utils.LavaFormatInfo("api is disabled", utils.Attribute{Key: "name", Value: name}, utils.Attribute{Key: "connectionType", Value: connectionType}) + return nil, utils.LavaFormatInfo("api is disabled", utils.Attribute{Key: "apiKey", Value: apiKey}) } return &apiCont, nil } +func (apip *BaseChainParser) isValidInternalPath(path string) bool { + if apip == nil || len(apip.internalPaths) == 0 { + return false + } + _, ok := apip.internalPaths[path] + return ok +} + // getSupportedApi fetches service api from spec by name func (apip *BaseChainParser) getApiCollection(connectionType, internalPath, addon string) (*spectypes.ApiCollection, error) { // Guard that the GrpcChainParser instance exists @@ -342,7 +349,8 @@ func (apip *BaseChainParser) getApiCollection(connectionType, internalPath, addo return api, nil } -func getServiceApis(spec spectypes.Spec, rpcInterface string) (retServerApis map[ApiKey]ApiContainer, retTaggedApis map[spectypes.FUNCTION_TAG]TaggedContainer, retApiCollections map[CollectionKey]*spectypes.ApiCollection, retHeaders map[ApiKey]*spectypes.Header, retVerifications map[VerificationKey][]VerificationContainer) { +func getServiceApis(spec spectypes.Spec, rpcInterface string) (retInternalPaths map[string]struct{}, retServerApis map[ApiKey]ApiContainer, retTaggedApis map[spectypes.FUNCTION_TAG]TaggedContainer, retApiCollections map[CollectionKey]*spectypes.ApiCollection, retHeaders map[ApiKey]*spectypes.Header, retVerifications map[VerificationKey][]VerificationContainer) { + retInternalPaths = map[string]struct{}{} serverApis := map[ApiKey]ApiContainer{} taggedApis := map[spectypes.FUNCTION_TAG]TaggedContainer{} headers := map[ApiKey]*spectypes.Header{} @@ -361,6 +369,10 @@ func getServiceApis(spec spectypes.Spec, rpcInterface string) (retServerApis map InternalPath: apiCollection.CollectionData.InternalPath, Addon: apiCollection.CollectionData.AddOn, } + + // add as a valid internal path + retInternalPaths[apiCollection.CollectionData.InternalPath] = struct{}{} + for _, parsing := range apiCollection.ParseDirectives { taggedApis[parsing.FunctionTag] = TaggedContainer{ Parsing: parsing, @@ -387,12 +399,34 @@ func getServiceApis(spec spectypes.Spec, rpcInterface string) (retServerApis map collectionKey: collectionKey, } } else { - serverApis[ApiKey{ - Name: api.Name, - ConnectionType: collectionKey.ConnectionType, - }] = ApiContainer{ - api: api, - collectionKey: collectionKey, + // add another internal path entry so it can specifically be referenced + if apiCollection.CollectionData.InternalPath != "" { + serverApis[ApiKey{ + Name: api.Name, + ConnectionType: collectionKey.ConnectionType, + InternalPath: apiCollection.CollectionData.InternalPath, + }] = ApiContainer{ + api: api, + collectionKey: collectionKey, + } + // if it does not exist set it + if _, ok := serverApis[ApiKey{Name: api.Name, ConnectionType: collectionKey.ConnectionType}]; !ok { + serverApis[ApiKey{ + Name: api.Name, + ConnectionType: collectionKey.ConnectionType, + }] = ApiContainer{ + api: api, + collectionKey: collectionKey, + } + } + } else { + serverApis[ApiKey{ + Name: api.Name, + ConnectionType: collectionKey.ConnectionType, + }] = ApiContainer{ + api: api, + collectionKey: collectionKey, + } } } } @@ -438,7 +472,7 @@ func getServiceApis(spec spectypes.Spec, rpcInterface string) (retServerApis map apiCollections[collectionKey] = apiCollection } } - return serverApis, taggedApis, apiCollections, headers, verifications + return retInternalPaths, serverApis, taggedApis, apiCollections, headers, verifications } func (bcp *BaseChainParser) ExtensionsParser() *extensionslib.ExtensionParser { diff --git a/protocol/chainlib/common.go b/protocol/chainlib/common.go index dfa76af899..8f1aa7cfbf 100644 --- a/protocol/chainlib/common.go +++ b/protocol/chainlib/common.go @@ -67,6 +67,7 @@ type ApiContainer struct { type ApiKey struct { Name string ConnectionType string + InternalPath string } type CollectionKey struct { diff --git a/protocol/chainlib/common_test.go b/protocol/chainlib/common_test.go index 235b1c4716..9f8c6680dc 100644 --- a/protocol/chainlib/common_test.go +++ b/protocol/chainlib/common_test.go @@ -384,7 +384,7 @@ func TestGetServiceApis(t *testing.T) { } rpcInterface := spectypes.APIInterfaceRest - serverApis, _, _, _, _ := getServiceApis(spec, rpcInterface) + _, serverApis, _, _, _, _ := getServiceApis(spec, rpcInterface) // Test serverApis if len(serverApis) != 3 { diff --git a/protocol/chainlib/grpc.go b/protocol/chainlib/grpc.go index 379972521f..109c341cd1 100644 --- a/protocol/chainlib/grpc.go +++ b/protocol/chainlib/grpc.go @@ -94,7 +94,8 @@ func (apip *GrpcChainParser) getSupportedApi(name, connectionType string) (*ApiC if apip == nil { return nil, errors.New("ChainParser not defined") } - return apip.BaseChainParser.getSupportedApi(name, connectionType) + apiKey := ApiKey{Name: name, ConnectionType: connectionType} + return apip.BaseChainParser.getSupportedApi(apiKey) } func (apip *GrpcChainParser) setupForConsumer(relayer grpcproxy.ProxyCallBack) { @@ -217,8 +218,8 @@ func (apip *GrpcChainParser) SetSpec(spec spectypes.Spec) { defer apip.rwLock.Unlock() // extract server and tagged apis from spec - serverApis, taggedApis, apiCollections, headers, verifications := getServiceApis(spec, spectypes.APIInterfaceGrpc) - apip.BaseChainParser.Construct(spec, taggedApis, serverApis, apiCollections, headers, verifications, apip.BaseChainParser.extensionParser) + internalPaths, serverApis, taggedApis, apiCollections, headers, verifications := getServiceApis(spec, spectypes.APIInterfaceGrpc) + apip.BaseChainParser.Construct(spec, internalPaths, taggedApis, serverApis, apiCollections, headers, verifications, apip.BaseChainParser.extensionParser) } // DataReliabilityParams returns data reliability params from spec (spec.enabled and spec.dataReliabilityThreshold) diff --git a/protocol/chainlib/jsonRPC.go b/protocol/chainlib/jsonRPC.go index 00b1b6e436..d5fbe9dabb 100644 --- a/protocol/chainlib/jsonRPC.go +++ b/protocol/chainlib/jsonRPC.go @@ -47,12 +47,13 @@ func (apip *JsonRPCChainParser) getApiCollection(connectionType, internalPath, a return apip.BaseChainParser.getApiCollection(connectionType, internalPath, addon) } -func (apip *JsonRPCChainParser) getSupportedApi(name, connectionType string) (*ApiContainer, error) { +func (apip *JsonRPCChainParser) getSupportedApi(name, connectionType string, internalPath string) (*ApiContainer, error) { // Guard that the JsonRPCChainParser instance exists if apip == nil { return nil, errors.New("ChainParser not defined") } - return apip.BaseChainParser.getSupportedApi(name, connectionType) + apiKey := ApiKey{Name: name, ConnectionType: connectionType, InternalPath: internalPath} + return apip.BaseChainParser.getSupportedApi(apiKey) } func (apip *JsonRPCChainParser) CraftMessage(parsing *spectypes.ParseDirective, connectionType string, craftData *CraftData, metadata []pairingtypes.Metadata) (ChainMessageForSend, error) { @@ -71,7 +72,7 @@ func (apip *JsonRPCChainParser) CraftMessage(parsing *spectypes.ParseDirective, Params: nil, BaseMessage: chainproxy.BaseMessage{Headers: metadata}, } - apiCont, err := apip.getSupportedApi(parsing.ApiName, connectionType) + apiCont, err := apip.getSupportedApi(parsing.ApiName, connectionType, "") if err != nil { return nil, err } @@ -103,8 +104,12 @@ func (apip *JsonRPCChainParser) ParseMsg(url string, data []byte, connectionType var latestRequestedBlock, earliestRequestedBlock int64 = 0, 0 for idx, msg := range msgs { var requestedBlockForMessage int64 + internalPath := "" + if apip.isValidInternalPath(url) { + internalPath = url + } // Check api is supported and save it in nodeMsg - apiCont, err := apip.getSupportedApi(msg.Method, connectionType) + apiCont, err := apip.getSupportedApi(msg.Method, connectionType, internalPath) if err != nil { utils.LavaFormatInfo("getSupportedApi jsonrpc failed", utils.LogAttr("method", msg.Method), utils.LogAttr("error", err)) return nil, err @@ -232,8 +237,8 @@ func (apip *JsonRPCChainParser) SetSpec(spec spectypes.Spec) { defer apip.rwLock.Unlock() // extract server and tagged apis from spec - serverApis, taggedApis, apiCollections, headers, verifications := getServiceApis(spec, spectypes.APIInterfaceJsonRPC) - apip.BaseChainParser.Construct(spec, taggedApis, serverApis, apiCollections, headers, verifications, apip.BaseChainParser.extensionParser) + internalPaths, serverApis, taggedApis, apiCollections, headers, verifications := getServiceApis(spec, spectypes.APIInterfaceJsonRPC) + apip.BaseChainParser.Construct(spec, internalPaths, taggedApis, serverApis, apiCollections, headers, verifications, apip.BaseChainParser.extensionParser) } func (apip *JsonRPCChainParser) GetInternalPaths() map[string]struct{} { @@ -444,15 +449,17 @@ func (apil *JsonRPCChainListener) Serve(ctx context.Context, cmdFlags common.Con logFormattedMsg = utils.FormatLongString(logFormattedMsg, relayMsgLogMaxChars) } + path := "/" + fiberCtx.Params("*") utils.LavaFormatDebug("in <<<", utils.LogAttr("GUID", ctx), + utils.LogAttr("path", path), utils.LogAttr("seed", msgSeed), utils.LogAttr("_msg", logFormattedMsg), utils.LogAttr("dappID", dappID), utils.LogAttr("headers", headers), ) refererMatch := fiberCtx.Params(refererMatchString, "") - relayResult, err := apil.relaySender.SendRelay(ctx, "", msg, http.MethodPost, dappID, consumerIp, metricsData, headers) + relayResult, err := apil.relaySender.SendRelay(ctx, path, msg, http.MethodPost, dappID, consumerIp, metricsData, headers) if refererMatch != "" && apil.refererData != nil && err == nil { go apil.refererData.SendReferer(refererMatch, chainID, msg, metadataValues, nil) } diff --git a/protocol/chainlib/jsonRPC_test.go b/protocol/chainlib/jsonRPC_test.go index 216f35db0c..b8556a3ce3 100644 --- a/protocol/chainlib/jsonRPC_test.go +++ b/protocol/chainlib/jsonRPC_test.go @@ -66,7 +66,7 @@ func TestJSONChainParser_NilGuard(t *testing.T) { apip.SetSpec(spectypes.Spec{}) apip.DataReliabilityParams() apip.ChainBlockStats() - apip.getSupportedApi("", "") + apip.getSupportedApi("", "", "") apip.ParseMsg("", []byte{}, "", nil, extensionslib.ExtensionInfo{LatestBlock: 0}) } @@ -77,7 +77,7 @@ func TestJSONGetSupportedApi(t *testing.T) { serverApis: map[ApiKey]ApiContainer{{Name: "API1", ConnectionType: connectionType_test}: {api: &spectypes.Api{Name: "API1", Enabled: true}, collectionKey: CollectionKey{ConnectionType: connectionType_test}}}, }, } - api, err := apip.getSupportedApi("API1", connectionType_test) + api, err := apip.getSupportedApi("API1", connectionType_test, "") assert.NoError(t, err) assert.Equal(t, "API1", api.api.Name) @@ -87,7 +87,7 @@ func TestJSONGetSupportedApi(t *testing.T) { serverApis: map[ApiKey]ApiContainer{{Name: "API1", ConnectionType: connectionType_test}: {api: &spectypes.Api{Name: "API1", Enabled: true}, collectionKey: CollectionKey{ConnectionType: connectionType_test}}}, }, } - _, err = apip.getSupportedApi("API2", connectionType_test) + _, err = apip.getSupportedApi("API2", connectionType_test, "") assert.Error(t, err) // Test case 3: Returns error if the API is disabled @@ -96,7 +96,7 @@ func TestJSONGetSupportedApi(t *testing.T) { serverApis: map[ApiKey]ApiContainer{{Name: "API1", ConnectionType: connectionType_test}: {api: &spectypes.Api{Name: "API1", Enabled: false}, collectionKey: CollectionKey{ConnectionType: connectionType_test}}}, }, } - _, err = apip.getSupportedApi("API1", connectionType_test) + _, err = apip.getSupportedApi("API1", connectionType_test, "") assert.Error(t, err) } @@ -341,3 +341,43 @@ func TestJsonRpcBatchCallSameID(t *testing.T) { } }() } + +func TestJsonRpcInternalPathsMultipleVersions(t *testing.T) { + ctx := context.Background() + serverHandle := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + // Handle the incoming request and provide the desired response + w.WriteHeader(http.StatusOK) + fmt.Fprintf(w, `{"jsonrpc":"2.0","id":1,"result":"%s"}`, r.RequestURI) + }) + chainParser, chainProxy, chainFetcher, closeServer, _, err := CreateChainLibMocks(ctx, "STRK", spectypes.APIInterfaceJsonRPC, serverHandle, "../../", nil) + require.NoError(t, err) + require.NotNil(t, chainParser) + require.NotNil(t, chainProxy) + require.NotNil(t, chainFetcher) + v5_path := "/rpc/v0_5" + v6_path := "/rpc/v0_6" + req_data := []byte(`{"jsonrpc": "2.0", "id": 1, "method": "starknet_specVersion", "params": []}`) + chainMessage, err := chainParser.ParseMsg("", req_data, http.MethodPost, nil, extensionslib.ExtensionInfo{LatestBlock: 0}) + require.NoError(t, err) + api := chainMessage.GetApi() + collection := chainMessage.GetApiCollection() + require.Equal(t, "starknet_specVersion", api.Name) + require.Equal(t, "", collection.CollectionData.InternalPath) + + chainMessage, err = chainParser.ParseMsg(v5_path, req_data, http.MethodPost, nil, extensionslib.ExtensionInfo{LatestBlock: 0}) + require.NoError(t, err) + api = chainMessage.GetApi() + collection = chainMessage.GetApiCollection() + require.Equal(t, "starknet_specVersion", api.Name) + require.Equal(t, v5_path, collection.CollectionData.InternalPath) + + chainMessage, err = chainParser.ParseMsg(v6_path, req_data, http.MethodPost, nil, extensionslib.ExtensionInfo{LatestBlock: 0}) + require.NoError(t, err) + api = chainMessage.GetApi() + collection = chainMessage.GetApiCollection() + require.Equal(t, "starknet_specVersion", api.Name) + require.Equal(t, v6_path, collection.CollectionData.InternalPath) + if closeServer != nil { + closeServer() + } +} diff --git a/protocol/chainlib/rest.go b/protocol/chainlib/rest.go index 8e8e3d09f3..c74897fae3 100644 --- a/protocol/chainlib/rest.go +++ b/protocol/chainlib/rest.go @@ -200,8 +200,8 @@ func (apip *RestChainParser) SetSpec(spec spectypes.Spec) { defer apip.rwLock.Unlock() // extract server and tagged apis from spec - serverApis, taggedApis, apiCollections, headers, verifications := getServiceApis(spec, spectypes.APIInterfaceRest) - apip.BaseChainParser.Construct(spec, taggedApis, serverApis, apiCollections, headers, verifications, apip.BaseChainParser.extensionParser) + internalPaths, serverApis, taggedApis, apiCollections, headers, verifications := getServiceApis(spec, spectypes.APIInterfaceRest) + apip.BaseChainParser.Construct(spec, internalPaths, taggedApis, serverApis, apiCollections, headers, verifications, apip.BaseChainParser.extensionParser) } // DataReliabilityParams returns data reliability params from spec (spec.enabled and spec.dataReliabilityThreshold) diff --git a/protocol/chainlib/tendermintRPC.go b/protocol/chainlib/tendermintRPC.go index 9cfa8fbdfd..3cac7517d5 100644 --- a/protocol/chainlib/tendermintRPC.go +++ b/protocol/chainlib/tendermintRPC.go @@ -54,7 +54,8 @@ func (apip *TendermintChainParser) getSupportedApi(name, connectionType string) if apip == nil { return nil, errors.New("ChainParser not defined") } - return apip.BaseChainParser.getSupportedApi(name, connectionType) + apiKey := ApiKey{Name: name, ConnectionType: connectionType} + return apip.BaseChainParser.getSupportedApi(apiKey) } func (apip *TendermintChainParser) CraftMessage(parsing *spectypes.ParseDirective, connectionType string, craftData *CraftData, metadata []pairingtypes.Metadata) (ChainMessageForSend, error) { @@ -270,8 +271,8 @@ func (apip *TendermintChainParser) SetSpec(spec spectypes.Spec) { defer apip.rwLock.Unlock() // extract server and tagged apis from spec - serverApis, taggedApis, apiCollections, headers, verifications := getServiceApis(spec, spectypes.APIInterfaceTendermintRPC) - apip.BaseChainParser.Construct(spec, taggedApis, serverApis, apiCollections, headers, verifications, apip.BaseChainParser.extensionParser) + internalPaths, serverApis, taggedApis, apiCollections, headers, verifications := getServiceApis(spec, spectypes.APIInterfaceTendermintRPC) + apip.BaseChainParser.Construct(spec, internalPaths, taggedApis, serverApis, apiCollections, headers, verifications, apip.BaseChainParser.extensionParser) } // DataReliabilityParams returns data reliability params from spec (spec.enabled and spec.dataReliabilityThreshold) diff --git a/protocol/rpcconsumer/rpcconsumer_server.go b/protocol/rpcconsumer/rpcconsumer_server.go index 2efc03251e..5ac4a2dbd2 100644 --- a/protocol/rpcconsumer/rpcconsumer_server.go +++ b/protocol/rpcconsumer/rpcconsumer_server.go @@ -754,7 +754,7 @@ func (rpccs *RPCConsumerServer) relayInner(ctx context.Context, singleConsumerSe } relayLatency = time.Since(relaySentTime) if rpccs.debugRelays { - utils.LavaFormatDebug("sending relay to provider", + attributes := []utils.Attribute{ utils.LogAttr("GUID", ctx), utils.LogAttr("addon", relayRequest.RelayData.Addon), utils.LogAttr("extensions", relayRequest.RelayData.Extensions), @@ -769,7 +769,14 @@ func (rpccs *RPCConsumerServer) relayInner(ctx context.Context, singleConsumerSe utils.LogAttr("latency", relayLatency), utils.LogAttr("replyErred", err != nil), utils.LogAttr("replyLatestBlock", reply.GetLatestBlock()), - ) + utils.LogAttr("method", chainMessage.GetApi().Name), + } + internalPath := chainMessage.GetApiCollection().CollectionData.InternalPath + if internalPath != "" { + attributes = append(attributes, utils.LogAttr("internal_path", internalPath), + utils.LogAttr("apiUrl", relayRequest.RelayData.ApiUrl)) + } + utils.LavaFormatDebug("sending relay to provider", attributes...) } if err != nil { backoff := false diff --git a/scripts/test_spec_full.sh b/scripts/test_spec_full.sh index 272c3f1ebf..b06291769b 100755 --- a/scripts/test_spec_full.sh +++ b/scripts/test_spec_full.sh @@ -207,7 +207,7 @@ done echo "[+]generated consumer config: $output_consumer_yaml" cat $output_consumer_yaml if [ "$dry" = false ]; then - screen -d -m -S consumers bash -c "source ~/.bashrc; lavap rpcconsumer testutil/debugging/logs/consumer.yml $EXTRA_PORTAL_FLAGS --geolocation 1 --log_level debug --from user1 --chain-id lava --allow-insecure-provider-dialing --metrics-listen-address ":7779" 2>&1 | tee $LOGS_DIR/PORTAL.log" + screen -d -m -S consumers bash -c "source ~/.bashrc; lavap rpcconsumer testutil/debugging/logs/consumer.yml $EXTRA_PORTAL_FLAGS --geolocation 1 --debug-relays --log_level debug --from user1 --chain-id lava --allow-insecure-provider-dialing --metrics-listen-address ":7779" 2>&1 | tee $LOGS_DIR/PORTAL.log" echo "[+] letting providers start and running health check then running command with flags: $test_consumer_command_args" sleep 10