Skip to content

Commit

Permalink
added multiple versions support with internal paths (lavanet#1394)
Browse files Browse the repository at this point in the history
  • Loading branch information
omerlavanet authored Apr 28, 2024
1 parent 1ec3d07 commit 067bfc1
Show file tree
Hide file tree
Showing 10 changed files with 129 additions and 38 deletions.
64 changes: 49 additions & 15 deletions protocol/chainlib/base_chain_parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ type PolicyInf interface {
}

type BaseChainParser struct {
internalPaths map[string]struct{}
taggedApis map[spectypes.FUNCTION_TAG]TaggedContainer
spec spectypes.Spec
rwLock sync.RWMutex
Expand Down Expand Up @@ -226,11 +227,12 @@ func (bcp *BaseChainParser) GetVerifications(supported []string) (retVerificatio
return
}

func (bcp *BaseChainParser) Construct(spec spectypes.Spec, taggedApis map[spectypes.FUNCTION_TAG]TaggedContainer,
func (bcp *BaseChainParser) Construct(spec spectypes.Spec, internalPaths map[string]struct{}, taggedApis map[spectypes.FUNCTION_TAG]TaggedContainer,
serverApis map[ApiKey]ApiContainer, apiCollections map[CollectionKey]*spectypes.ApiCollection, headers map[ApiKey]*spectypes.Header,
verifications map[VerificationKey][]VerificationContainer, extensionParser extensionslib.ExtensionParser,
) {
bcp.spec = spec
bcp.internalPaths = internalPaths
bcp.serverApis = serverApis
bcp.taggedApis = taggedApis
bcp.headers = headers
Expand Down Expand Up @@ -282,7 +284,7 @@ func (bcp *BaseChainParser) extensionParsingInner(addon string, parsedMessageArg
}

// getSupportedApi fetches service api from spec by name
func (apip *BaseChainParser) getSupportedApi(name, connectionType string) (*ApiContainer, error) {
func (apip *BaseChainParser) getSupportedApi(apiKey ApiKey) (*ApiContainer, error) {
// Guard that the GrpcChainParser instance exists
if apip == nil {
return nil, errors.New("ChainParser not defined")
Expand All @@ -293,10 +295,7 @@ func (apip *BaseChainParser) getSupportedApi(name, connectionType string) (*ApiC
defer apip.rwLock.RUnlock()

// Fetch server api by name
apiCont, ok := apip.serverApis[ApiKey{
Name: name,
ConnectionType: connectionType,
}]
apiCont, ok := apip.serverApis[apiKey]

// Return an error if spec does not exist
if !ok {
Expand All @@ -305,12 +304,20 @@ func (apip *BaseChainParser) getSupportedApi(name, connectionType string) (*ApiC

// Return an error if api is disabled
if !apiCont.api.Enabled {
return nil, utils.LavaFormatInfo("api is disabled", utils.Attribute{Key: "name", Value: name}, utils.Attribute{Key: "connectionType", Value: connectionType})
return nil, utils.LavaFormatInfo("api is disabled", utils.Attribute{Key: "apiKey", Value: apiKey})
}

return &apiCont, nil
}

func (apip *BaseChainParser) isValidInternalPath(path string) bool {
if apip == nil || len(apip.internalPaths) == 0 {
return false
}
_, ok := apip.internalPaths[path]
return ok
}

// getSupportedApi fetches service api from spec by name
func (apip *BaseChainParser) getApiCollection(connectionType, internalPath, addon string) (*spectypes.ApiCollection, error) {
// Guard that the GrpcChainParser instance exists
Expand Down Expand Up @@ -342,7 +349,8 @@ func (apip *BaseChainParser) getApiCollection(connectionType, internalPath, addo
return api, nil
}

func getServiceApis(spec spectypes.Spec, rpcInterface string) (retServerApis map[ApiKey]ApiContainer, retTaggedApis map[spectypes.FUNCTION_TAG]TaggedContainer, retApiCollections map[CollectionKey]*spectypes.ApiCollection, retHeaders map[ApiKey]*spectypes.Header, retVerifications map[VerificationKey][]VerificationContainer) {
func getServiceApis(spec spectypes.Spec, rpcInterface string) (retInternalPaths map[string]struct{}, retServerApis map[ApiKey]ApiContainer, retTaggedApis map[spectypes.FUNCTION_TAG]TaggedContainer, retApiCollections map[CollectionKey]*spectypes.ApiCollection, retHeaders map[ApiKey]*spectypes.Header, retVerifications map[VerificationKey][]VerificationContainer) {
retInternalPaths = map[string]struct{}{}
serverApis := map[ApiKey]ApiContainer{}
taggedApis := map[spectypes.FUNCTION_TAG]TaggedContainer{}
headers := map[ApiKey]*spectypes.Header{}
Expand All @@ -361,6 +369,10 @@ func getServiceApis(spec spectypes.Spec, rpcInterface string) (retServerApis map
InternalPath: apiCollection.CollectionData.InternalPath,
Addon: apiCollection.CollectionData.AddOn,
}

// add as a valid internal path
retInternalPaths[apiCollection.CollectionData.InternalPath] = struct{}{}

for _, parsing := range apiCollection.ParseDirectives {
taggedApis[parsing.FunctionTag] = TaggedContainer{
Parsing: parsing,
Expand All @@ -387,12 +399,34 @@ func getServiceApis(spec spectypes.Spec, rpcInterface string) (retServerApis map
collectionKey: collectionKey,
}
} else {
serverApis[ApiKey{
Name: api.Name,
ConnectionType: collectionKey.ConnectionType,
}] = ApiContainer{
api: api,
collectionKey: collectionKey,
// add another internal path entry so it can specifically be referenced
if apiCollection.CollectionData.InternalPath != "" {
serverApis[ApiKey{
Name: api.Name,
ConnectionType: collectionKey.ConnectionType,
InternalPath: apiCollection.CollectionData.InternalPath,
}] = ApiContainer{
api: api,
collectionKey: collectionKey,
}
// if it does not exist set it
if _, ok := serverApis[ApiKey{Name: api.Name, ConnectionType: collectionKey.ConnectionType}]; !ok {
serverApis[ApiKey{
Name: api.Name,
ConnectionType: collectionKey.ConnectionType,
}] = ApiContainer{
api: api,
collectionKey: collectionKey,
}
}
} else {
serverApis[ApiKey{
Name: api.Name,
ConnectionType: collectionKey.ConnectionType,
}] = ApiContainer{
api: api,
collectionKey: collectionKey,
}
}
}
}
Expand Down Expand Up @@ -438,7 +472,7 @@ func getServiceApis(spec spectypes.Spec, rpcInterface string) (retServerApis map
apiCollections[collectionKey] = apiCollection
}
}
return serverApis, taggedApis, apiCollections, headers, verifications
return retInternalPaths, serverApis, taggedApis, apiCollections, headers, verifications
}

func (bcp *BaseChainParser) ExtensionsParser() *extensionslib.ExtensionParser {
Expand Down
1 change: 1 addition & 0 deletions protocol/chainlib/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ type ApiContainer struct {
type ApiKey struct {
Name string
ConnectionType string
InternalPath string
}

type CollectionKey struct {
Expand Down
2 changes: 1 addition & 1 deletion protocol/chainlib/common_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -384,7 +384,7 @@ func TestGetServiceApis(t *testing.T) {
}

rpcInterface := spectypes.APIInterfaceRest
serverApis, _, _, _, _ := getServiceApis(spec, rpcInterface)
_, serverApis, _, _, _, _ := getServiceApis(spec, rpcInterface)

// Test serverApis
if len(serverApis) != 3 {
Expand Down
7 changes: 4 additions & 3 deletions protocol/chainlib/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,8 @@ func (apip *GrpcChainParser) getSupportedApi(name, connectionType string) (*ApiC
if apip == nil {
return nil, errors.New("ChainParser not defined")
}
return apip.BaseChainParser.getSupportedApi(name, connectionType)
apiKey := ApiKey{Name: name, ConnectionType: connectionType}
return apip.BaseChainParser.getSupportedApi(apiKey)
}

func (apip *GrpcChainParser) setupForConsumer(relayer grpcproxy.ProxyCallBack) {
Expand Down Expand Up @@ -217,8 +218,8 @@ func (apip *GrpcChainParser) SetSpec(spec spectypes.Spec) {
defer apip.rwLock.Unlock()

// extract server and tagged apis from spec
serverApis, taggedApis, apiCollections, headers, verifications := getServiceApis(spec, spectypes.APIInterfaceGrpc)
apip.BaseChainParser.Construct(spec, taggedApis, serverApis, apiCollections, headers, verifications, apip.BaseChainParser.extensionParser)
internalPaths, serverApis, taggedApis, apiCollections, headers, verifications := getServiceApis(spec, spectypes.APIInterfaceGrpc)
apip.BaseChainParser.Construct(spec, internalPaths, taggedApis, serverApis, apiCollections, headers, verifications, apip.BaseChainParser.extensionParser)
}

// DataReliabilityParams returns data reliability params from spec (spec.enabled and spec.dataReliabilityThreshold)
Expand Down
21 changes: 14 additions & 7 deletions protocol/chainlib/jsonRPC.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,12 +47,13 @@ func (apip *JsonRPCChainParser) getApiCollection(connectionType, internalPath, a
return apip.BaseChainParser.getApiCollection(connectionType, internalPath, addon)
}

func (apip *JsonRPCChainParser) getSupportedApi(name, connectionType string) (*ApiContainer, error) {
func (apip *JsonRPCChainParser) getSupportedApi(name, connectionType string, internalPath string) (*ApiContainer, error) {
// Guard that the JsonRPCChainParser instance exists
if apip == nil {
return nil, errors.New("ChainParser not defined")
}
return apip.BaseChainParser.getSupportedApi(name, connectionType)
apiKey := ApiKey{Name: name, ConnectionType: connectionType, InternalPath: internalPath}
return apip.BaseChainParser.getSupportedApi(apiKey)
}

func (apip *JsonRPCChainParser) CraftMessage(parsing *spectypes.ParseDirective, connectionType string, craftData *CraftData, metadata []pairingtypes.Metadata) (ChainMessageForSend, error) {
Expand All @@ -71,7 +72,7 @@ func (apip *JsonRPCChainParser) CraftMessage(parsing *spectypes.ParseDirective,
Params: nil,
BaseMessage: chainproxy.BaseMessage{Headers: metadata},
}
apiCont, err := apip.getSupportedApi(parsing.ApiName, connectionType)
apiCont, err := apip.getSupportedApi(parsing.ApiName, connectionType, "")
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -103,8 +104,12 @@ func (apip *JsonRPCChainParser) ParseMsg(url string, data []byte, connectionType
var latestRequestedBlock, earliestRequestedBlock int64 = 0, 0
for idx, msg := range msgs {
var requestedBlockForMessage int64
internalPath := ""
if apip.isValidInternalPath(url) {
internalPath = url
}
// Check api is supported and save it in nodeMsg
apiCont, err := apip.getSupportedApi(msg.Method, connectionType)
apiCont, err := apip.getSupportedApi(msg.Method, connectionType, internalPath)
if err != nil {
utils.LavaFormatInfo("getSupportedApi jsonrpc failed", utils.LogAttr("method", msg.Method), utils.LogAttr("error", err))
return nil, err
Expand Down Expand Up @@ -232,8 +237,8 @@ func (apip *JsonRPCChainParser) SetSpec(spec spectypes.Spec) {
defer apip.rwLock.Unlock()

// extract server and tagged apis from spec
serverApis, taggedApis, apiCollections, headers, verifications := getServiceApis(spec, spectypes.APIInterfaceJsonRPC)
apip.BaseChainParser.Construct(spec, taggedApis, serverApis, apiCollections, headers, verifications, apip.BaseChainParser.extensionParser)
internalPaths, serverApis, taggedApis, apiCollections, headers, verifications := getServiceApis(spec, spectypes.APIInterfaceJsonRPC)
apip.BaseChainParser.Construct(spec, internalPaths, taggedApis, serverApis, apiCollections, headers, verifications, apip.BaseChainParser.extensionParser)
}

func (apip *JsonRPCChainParser) GetInternalPaths() map[string]struct{} {
Expand Down Expand Up @@ -444,15 +449,17 @@ func (apil *JsonRPCChainListener) Serve(ctx context.Context, cmdFlags common.Con
logFormattedMsg = utils.FormatLongString(logFormattedMsg, relayMsgLogMaxChars)
}

path := "/" + fiberCtx.Params("*")
utils.LavaFormatDebug("in <<<",
utils.LogAttr("GUID", ctx),
utils.LogAttr("path", path),
utils.LogAttr("seed", msgSeed),
utils.LogAttr("_msg", logFormattedMsg),
utils.LogAttr("dappID", dappID),
utils.LogAttr("headers", headers),
)
refererMatch := fiberCtx.Params(refererMatchString, "")
relayResult, err := apil.relaySender.SendRelay(ctx, "", msg, http.MethodPost, dappID, consumerIp, metricsData, headers)
relayResult, err := apil.relaySender.SendRelay(ctx, path, msg, http.MethodPost, dappID, consumerIp, metricsData, headers)
if refererMatch != "" && apil.refererData != nil && err == nil {
go apil.refererData.SendReferer(refererMatch, chainID, msg, metadataValues, nil)
}
Expand Down
48 changes: 44 additions & 4 deletions protocol/chainlib/jsonRPC_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ func TestJSONChainParser_NilGuard(t *testing.T) {
apip.SetSpec(spectypes.Spec{})
apip.DataReliabilityParams()
apip.ChainBlockStats()
apip.getSupportedApi("", "")
apip.getSupportedApi("", "", "")
apip.ParseMsg("", []byte{}, "", nil, extensionslib.ExtensionInfo{LatestBlock: 0})
}

Expand All @@ -77,7 +77,7 @@ func TestJSONGetSupportedApi(t *testing.T) {
serverApis: map[ApiKey]ApiContainer{{Name: "API1", ConnectionType: connectionType_test}: {api: &spectypes.Api{Name: "API1", Enabled: true}, collectionKey: CollectionKey{ConnectionType: connectionType_test}}},
},
}
api, err := apip.getSupportedApi("API1", connectionType_test)
api, err := apip.getSupportedApi("API1", connectionType_test, "")
assert.NoError(t, err)
assert.Equal(t, "API1", api.api.Name)

Expand All @@ -87,7 +87,7 @@ func TestJSONGetSupportedApi(t *testing.T) {
serverApis: map[ApiKey]ApiContainer{{Name: "API1", ConnectionType: connectionType_test}: {api: &spectypes.Api{Name: "API1", Enabled: true}, collectionKey: CollectionKey{ConnectionType: connectionType_test}}},
},
}
_, err = apip.getSupportedApi("API2", connectionType_test)
_, err = apip.getSupportedApi("API2", connectionType_test, "")
assert.Error(t, err)

// Test case 3: Returns error if the API is disabled
Expand All @@ -96,7 +96,7 @@ func TestJSONGetSupportedApi(t *testing.T) {
serverApis: map[ApiKey]ApiContainer{{Name: "API1", ConnectionType: connectionType_test}: {api: &spectypes.Api{Name: "API1", Enabled: false}, collectionKey: CollectionKey{ConnectionType: connectionType_test}}},
},
}
_, err = apip.getSupportedApi("API1", connectionType_test)
_, err = apip.getSupportedApi("API1", connectionType_test, "")
assert.Error(t, err)
}

Expand Down Expand Up @@ -341,3 +341,43 @@ func TestJsonRpcBatchCallSameID(t *testing.T) {
}
}()
}

func TestJsonRpcInternalPathsMultipleVersions(t *testing.T) {
ctx := context.Background()
serverHandle := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
// Handle the incoming request and provide the desired response
w.WriteHeader(http.StatusOK)
fmt.Fprintf(w, `{"jsonrpc":"2.0","id":1,"result":"%s"}`, r.RequestURI)
})
chainParser, chainProxy, chainFetcher, closeServer, _, err := CreateChainLibMocks(ctx, "STRK", spectypes.APIInterfaceJsonRPC, serverHandle, "../../", nil)
require.NoError(t, err)
require.NotNil(t, chainParser)
require.NotNil(t, chainProxy)
require.NotNil(t, chainFetcher)
v5_path := "/rpc/v0_5"
v6_path := "/rpc/v0_6"
req_data := []byte(`{"jsonrpc": "2.0", "id": 1, "method": "starknet_specVersion", "params": []}`)
chainMessage, err := chainParser.ParseMsg("", req_data, http.MethodPost, nil, extensionslib.ExtensionInfo{LatestBlock: 0})
require.NoError(t, err)
api := chainMessage.GetApi()
collection := chainMessage.GetApiCollection()
require.Equal(t, "starknet_specVersion", api.Name)
require.Equal(t, "", collection.CollectionData.InternalPath)

chainMessage, err = chainParser.ParseMsg(v5_path, req_data, http.MethodPost, nil, extensionslib.ExtensionInfo{LatestBlock: 0})
require.NoError(t, err)
api = chainMessage.GetApi()
collection = chainMessage.GetApiCollection()
require.Equal(t, "starknet_specVersion", api.Name)
require.Equal(t, v5_path, collection.CollectionData.InternalPath)

chainMessage, err = chainParser.ParseMsg(v6_path, req_data, http.MethodPost, nil, extensionslib.ExtensionInfo{LatestBlock: 0})
require.NoError(t, err)
api = chainMessage.GetApi()
collection = chainMessage.GetApiCollection()
require.Equal(t, "starknet_specVersion", api.Name)
require.Equal(t, v6_path, collection.CollectionData.InternalPath)
if closeServer != nil {
closeServer()
}
}
4 changes: 2 additions & 2 deletions protocol/chainlib/rest.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,8 +200,8 @@ func (apip *RestChainParser) SetSpec(spec spectypes.Spec) {
defer apip.rwLock.Unlock()

// extract server and tagged apis from spec
serverApis, taggedApis, apiCollections, headers, verifications := getServiceApis(spec, spectypes.APIInterfaceRest)
apip.BaseChainParser.Construct(spec, taggedApis, serverApis, apiCollections, headers, verifications, apip.BaseChainParser.extensionParser)
internalPaths, serverApis, taggedApis, apiCollections, headers, verifications := getServiceApis(spec, spectypes.APIInterfaceRest)
apip.BaseChainParser.Construct(spec, internalPaths, taggedApis, serverApis, apiCollections, headers, verifications, apip.BaseChainParser.extensionParser)
}

// DataReliabilityParams returns data reliability params from spec (spec.enabled and spec.dataReliabilityThreshold)
Expand Down
7 changes: 4 additions & 3 deletions protocol/chainlib/tendermintRPC.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,8 @@ func (apip *TendermintChainParser) getSupportedApi(name, connectionType string)
if apip == nil {
return nil, errors.New("ChainParser not defined")
}
return apip.BaseChainParser.getSupportedApi(name, connectionType)
apiKey := ApiKey{Name: name, ConnectionType: connectionType}
return apip.BaseChainParser.getSupportedApi(apiKey)
}

func (apip *TendermintChainParser) CraftMessage(parsing *spectypes.ParseDirective, connectionType string, craftData *CraftData, metadata []pairingtypes.Metadata) (ChainMessageForSend, error) {
Expand Down Expand Up @@ -270,8 +271,8 @@ func (apip *TendermintChainParser) SetSpec(spec spectypes.Spec) {
defer apip.rwLock.Unlock()

// extract server and tagged apis from spec
serverApis, taggedApis, apiCollections, headers, verifications := getServiceApis(spec, spectypes.APIInterfaceTendermintRPC)
apip.BaseChainParser.Construct(spec, taggedApis, serverApis, apiCollections, headers, verifications, apip.BaseChainParser.extensionParser)
internalPaths, serverApis, taggedApis, apiCollections, headers, verifications := getServiceApis(spec, spectypes.APIInterfaceTendermintRPC)
apip.BaseChainParser.Construct(spec, internalPaths, taggedApis, serverApis, apiCollections, headers, verifications, apip.BaseChainParser.extensionParser)
}

// DataReliabilityParams returns data reliability params from spec (spec.enabled and spec.dataReliabilityThreshold)
Expand Down
11 changes: 9 additions & 2 deletions protocol/rpcconsumer/rpcconsumer_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -754,7 +754,7 @@ func (rpccs *RPCConsumerServer) relayInner(ctx context.Context, singleConsumerSe
}
relayLatency = time.Since(relaySentTime)
if rpccs.debugRelays {
utils.LavaFormatDebug("sending relay to provider",
attributes := []utils.Attribute{
utils.LogAttr("GUID", ctx),
utils.LogAttr("addon", relayRequest.RelayData.Addon),
utils.LogAttr("extensions", relayRequest.RelayData.Extensions),
Expand All @@ -769,7 +769,14 @@ func (rpccs *RPCConsumerServer) relayInner(ctx context.Context, singleConsumerSe
utils.LogAttr("latency", relayLatency),
utils.LogAttr("replyErred", err != nil),
utils.LogAttr("replyLatestBlock", reply.GetLatestBlock()),
)
utils.LogAttr("method", chainMessage.GetApi().Name),
}
internalPath := chainMessage.GetApiCollection().CollectionData.InternalPath
if internalPath != "" {
attributes = append(attributes, utils.LogAttr("internal_path", internalPath),
utils.LogAttr("apiUrl", relayRequest.RelayData.ApiUrl))
}
utils.LavaFormatDebug("sending relay to provider", attributes...)
}
if err != nil {
backoff := false
Expand Down
2 changes: 1 addition & 1 deletion scripts/test_spec_full.sh
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,7 @@ done
echo "[+]generated consumer config: $output_consumer_yaml"
cat $output_consumer_yaml
if [ "$dry" = false ]; then
screen -d -m -S consumers bash -c "source ~/.bashrc; lavap rpcconsumer testutil/debugging/logs/consumer.yml $EXTRA_PORTAL_FLAGS --geolocation 1 --log_level debug --from user1 --chain-id lava --allow-insecure-provider-dialing --metrics-listen-address ":7779" 2>&1 | tee $LOGS_DIR/PORTAL.log"
screen -d -m -S consumers bash -c "source ~/.bashrc; lavap rpcconsumer testutil/debugging/logs/consumer.yml $EXTRA_PORTAL_FLAGS --geolocation 1 --debug-relays --log_level debug --from user1 --chain-id lava --allow-insecure-provider-dialing --metrics-listen-address ":7779" 2>&1 | tee $LOGS_DIR/PORTAL.log"

echo "[+] letting providers start and running health check then running command with flags: $test_consumer_command_args"
sleep 10
Expand Down

0 comments on commit 067bfc1

Please sign in to comment.