Skip to content

Commit

Permalink
feat: support provider method routes (#1642)
Browse files Browse the repository at this point in the history
* finished, wip on tests

* fixed bugs, added tests and a config

* lint
  • Loading branch information
omerlavanet authored Aug 25, 2024
1 parent 4dd69db commit ca1828c
Show file tree
Hide file tree
Showing 6 changed files with 520 additions and 38 deletions.
35 changes: 35 additions & 0 deletions config/provider_examples/lava_example_archive_methodroute.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
endpoints:
- api-interface: tendermintrpc
chain-id: LAV1
network-address:
address: "127.0.0.1:2220"
node-urls:
- url: ws://127.0.0.1:26657/websocket
- url: http://127.0.0.1:26657
- url: http://127.0.0.1:26657
addons:
- archive
- url: https://trustless-api.com
methods:
- block
- block_by_hash
addons:
- archive
- api-interface: grpc
chain-id: LAV1
network-address:
address: "127.0.0.1:2220"
node-urls:
- url: 127.0.0.1:9090
- url: 127.0.0.1:9090
addons:
- archive
- api-interface: rest
chain-id: LAV1
network-address:
address: "127.0.0.1:2220"
node-urls:
- url: http://127.0.0.1:1317
- url: http://127.0.0.1:1317
addons:
- archive
140 changes: 104 additions & 36 deletions protocol/chainlib/chain_router.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,15 @@ import (
"google.golang.org/grpc/metadata"
)

type MethodRoute struct {
lavasession.RouterKey
method string
}

type chainRouterEntry struct {
ChainProxy
addonsSupported map[string]struct{}
methodsRouted map[string]struct{}
}

func (cre *chainRouterEntry) isSupporting(addon string) bool {
Expand All @@ -36,13 +42,26 @@ type chainRouterImpl struct {
chainProxyRouter map[lavasession.RouterKey][]chainRouterEntry
}

func (cri *chainRouterImpl) getChainProxySupporting(ctx context.Context, addon string, extensions []string) (ChainProxy, error) {
func (cri *chainRouterImpl) GetChainProxySupporting(ctx context.Context, addon string, extensions []string, method string) (ChainProxy, error) {
cri.lock.RLock()
defer cri.lock.RUnlock()

// check if that specific method has a special route, if it does apply it to the router key
wantedRouterKey := lavasession.NewRouterKey(extensions)
if chainProxyEntries, ok := cri.chainProxyRouter[wantedRouterKey]; ok {
for _, chainRouterEntry := range chainProxyEntries {
if chainRouterEntry.isSupporting(addon) {
// check if the method is supported
if len(chainRouterEntry.methodsRouted) > 0 {
if _, ok := chainRouterEntry.methodsRouted[method]; !ok {
continue
}
utils.LavaFormatTrace("chainProxy supporting method routing selected",
utils.LogAttr("addon", addon),
utils.LogAttr("wantedRouterKey", wantedRouterKey),
utils.LogAttr("method", method),
)
}
if wantedRouterKey != lavasession.GetEmptyRouterKey() { // add trailer only when router key is not default (||)
grpc.SetTrailer(ctx, metadata.Pairs(RPCProviderNodeExtension, string(wantedRouterKey)))
}
Expand Down Expand Up @@ -70,7 +89,7 @@ func (cri chainRouterImpl) ExtensionsSupported(extensions []string) bool {
func (cri chainRouterImpl) SendNodeMsg(ctx context.Context, ch chan interface{}, chainMessage ChainMessageForSend, extensions []string) (relayReply *RelayReplyWrapper, subscriptionID string, relayReplyServer *rpcclient.ClientSubscription, proxyUrl common.NodeUrl, chainId string, err error) {
// add the parsed addon from the apiCollection
addon := chainMessage.GetApiCollection().CollectionData.AddOn
selectedChainProxy, err := cri.getChainProxySupporting(ctx, addon, extensions)
selectedChainProxy, err := cri.GetChainProxySupporting(ctx, addon, extensions, chainMessage.GetApi().Name)
if err != nil {
return nil, "", nil, common.NodeUrl{}, "", err
}
Expand All @@ -80,55 +99,83 @@ func (cri chainRouterImpl) SendNodeMsg(ctx context.Context, ch chan interface{},
}

// batch nodeUrls with the same addons together in a copy
func batchNodeUrlsByServices(rpcProviderEndpoint lavasession.RPCProviderEndpoint) map[lavasession.RouterKey]lavasession.RPCProviderEndpoint {
func (cri *chainRouterImpl) BatchNodeUrlsByServices(rpcProviderEndpoint lavasession.RPCProviderEndpoint) (map[lavasession.RouterKey]lavasession.RPCProviderEndpoint, error) {
returnedBatch := map[lavasession.RouterKey]lavasession.RPCProviderEndpoint{}
routesToCheck := map[lavasession.RouterKey]bool{}
methodRoutes := map[string]int{}
for _, nodeUrl := range rpcProviderEndpoint.NodeUrls {
routerKey := lavasession.NewRouterKey(nodeUrl.Addons)

u, err := url.Parse(nodeUrl.Url)
// Some parsing may fail because of gRPC
if err == nil && (u.Scheme == "ws" || u.Scheme == "wss") {
// if websocket, check if we have a router key for http already. if not add a websocket router key
// so in case we didn't get an http endpoint, we can use the ws one.
if _, ok := returnedBatch[routerKey]; !ok {
returnedBatch[routerKey] = lavasession.RPCProviderEndpoint{
NetworkAddress: rpcProviderEndpoint.NetworkAddress,
ChainID: rpcProviderEndpoint.ChainID,
ApiInterface: rpcProviderEndpoint.ApiInterface,
Geolocation: rpcProviderEndpoint.Geolocation,
NodeUrls: []common.NodeUrl{nodeUrl}, // add existing nodeUrl to the batch
}
if len(nodeUrl.Methods) > 0 {
// all methods defined here will go to the same batch
methodRoutesUnique := strings.Join(nodeUrl.Methods, ",")
var existing int
var ok bool
if existing, ok = methodRoutes[methodRoutesUnique]; !ok {
methodRoutes[methodRoutesUnique] = len(methodRoutes)
existing = len(methodRoutes)
}

// now change the router key to fit the websocket extension key.
nodeUrl.Addons = append(nodeUrl.Addons, WebSocketExtension)
routerKey = lavasession.NewRouterKey(nodeUrl.Addons)
routerKey = routerKey.ApplyMethodsRoute(existing)
}
cri.parseNodeUrl(nodeUrl, returnedBatch, routerKey, rpcProviderEndpoint)
}
if len(returnedBatch) == 0 {
return nil, utils.LavaFormatError("invalid batch, routes are empty", nil, utils.LogAttr("endpoint", rpcProviderEndpoint))
}
// validate all defined method routes have a regular route
for routerKey, valid := range routesToCheck {
if !valid {
return nil, utils.LavaFormatError("invalid batch, missing regular route for method route", nil, utils.LogAttr("routerKey", routerKey))
}
}
return returnedBatch, nil
}

if existingEndpoint, ok := returnedBatch[routerKey]; !ok {
func (*chainRouterImpl) parseNodeUrl(nodeUrl common.NodeUrl, returnedBatch map[lavasession.RouterKey]lavasession.RPCProviderEndpoint, routerKey lavasession.RouterKey, rpcProviderEndpoint lavasession.RPCProviderEndpoint) {
u, err := url.Parse(nodeUrl.Url)
// Some parsing may fail because of gRPC
if err == nil && (u.Scheme == "ws" || u.Scheme == "wss") {
// if websocket, check if we have a router key for http already. if not add a websocket router key
// so in case we didn't get an http endpoint, we can use the ws one.
if _, ok := returnedBatch[routerKey]; !ok {
returnedBatch[routerKey] = lavasession.RPCProviderEndpoint{
NetworkAddress: rpcProviderEndpoint.NetworkAddress,
ChainID: rpcProviderEndpoint.ChainID,
ApiInterface: rpcProviderEndpoint.ApiInterface,
Geolocation: rpcProviderEndpoint.Geolocation,
NodeUrls: []common.NodeUrl{nodeUrl}, // add existing nodeUrl to the batch
NodeUrls: []common.NodeUrl{nodeUrl},
}
} else {
// setting the incoming url first as it might be http while existing is websocket. (we prioritize http over ws when possible)
existingEndpoint.NodeUrls = append([]common.NodeUrl{nodeUrl}, existingEndpoint.NodeUrls...)
returnedBatch[routerKey] = existingEndpoint
}
// now change the router key to fit the websocket extension key.
nodeUrl.Addons = append(nodeUrl.Addons, WebSocketExtension)
routerKey = lavasession.NewRouterKey(nodeUrl.Addons)
}

return returnedBatch
if existingEndpoint, ok := returnedBatch[routerKey]; !ok {
returnedBatch[routerKey] = lavasession.RPCProviderEndpoint{
NetworkAddress: rpcProviderEndpoint.NetworkAddress,
ChainID: rpcProviderEndpoint.ChainID,
ApiInterface: rpcProviderEndpoint.ApiInterface,
Geolocation: rpcProviderEndpoint.Geolocation,
NodeUrls: []common.NodeUrl{nodeUrl},
}
} else {
// setting the incoming url first as it might be http while existing is websocket. (we prioritize http over ws when possible)
existingEndpoint.NodeUrls = append([]common.NodeUrl{nodeUrl}, existingEndpoint.NodeUrls...)
returnedBatch[routerKey] = existingEndpoint
}
}

func newChainRouter(ctx context.Context, nConns uint, rpcProviderEndpoint lavasession.RPCProviderEndpoint, chainParser ChainParser, proxyConstructor func(context.Context, uint, lavasession.RPCProviderEndpoint, ChainParser) (ChainProxy, error)) (ChainRouter, error) {
func newChainRouter(ctx context.Context, nConns uint, rpcProviderEndpoint lavasession.RPCProviderEndpoint, chainParser ChainParser, proxyConstructor func(context.Context, uint, lavasession.RPCProviderEndpoint, ChainParser) (ChainProxy, error)) (*chainRouterImpl, error) {
chainProxyRouter := map[lavasession.RouterKey][]chainRouterEntry{}

cri := chainRouterImpl{
lock: &sync.RWMutex{},
}
requiredMap := map[requirementSt]struct{}{}
supportedMap := map[requirementSt]struct{}{}
rpcProviderEndpointBatch := batchNodeUrlsByServices(rpcProviderEndpoint)
rpcProviderEndpointBatch, err := cri.BatchNodeUrlsByServices(rpcProviderEndpoint)
if err != nil {
return nil, err
}
for _, rpcProviderEndpointEntry := range rpcProviderEndpointBatch {
addons, extensions, err := chainParser.SeparateAddonsExtensions(append(rpcProviderEndpointEntry.NodeUrls[0].Addons, ""))
if err != nil {
Expand All @@ -151,6 +198,14 @@ func newChainRouter(ctx context.Context, nConns uint, rpcProviderEndpoint lavase
return allExtensionsRouterKey
}
routerKey := updateRouteCombinations(extensions, addons)
methodsRouted := map[string]struct{}{}
methods := rpcProviderEndpointEntry.NodeUrls[0].Methods
if len(methods) > 0 {
for _, method := range methods {
methodsRouted[method] = struct{}{}
}
}

chainProxy, err := proxyConstructor(ctx, nConns, rpcProviderEndpointEntry, chainParser)
if err != nil {
// TODO: allow some urls to be down
Expand All @@ -159,11 +214,17 @@ func newChainRouter(ctx context.Context, nConns uint, rpcProviderEndpoint lavase
chainRouterEntryInst := chainRouterEntry{
ChainProxy: chainProxy,
addonsSupported: addonsSupportedMap,
methodsRouted: methodsRouted,
}
if chainRouterEntries, ok := chainProxyRouter[routerKey]; !ok {
chainProxyRouter[routerKey] = []chainRouterEntry{chainRouterEntryInst}
} else {
chainProxyRouter[routerKey] = append(chainRouterEntries, chainRouterEntryInst)
if len(methodsRouted) > 0 {
// if there are routed methods we want this in the beginning to intercept them
chainProxyRouter[routerKey] = append([]chainRouterEntry{chainRouterEntryInst}, chainRouterEntries...)
} else {
chainProxyRouter[routerKey] = append(chainRouterEntries, chainRouterEntryInst)
}
}
}
if len(requiredMap) > len(supportedMap) {
Expand All @@ -189,11 +250,18 @@ func newChainRouter(ctx context.Context, nConns uint, rpcProviderEndpoint lavase
}
}

cri := chainRouterImpl{
lock: &sync.RWMutex{},
chainProxyRouter: chainProxyRouter,
// make sure all chainProxyRouter entries have one without a method routing
for routerKey, chainRouterEntries := range chainProxyRouter {
// get the last entry, if it has methods routed, we need to error out
lastEntry := chainRouterEntries[len(chainRouterEntries)-1]
if len(lastEntry.methodsRouted) > 0 {
return nil, utils.LavaFormatError("last entry in chainProxyRouter has methods routed, this means no chainProxy supports all methods", nil, utils.LogAttr("routerKey", routerKey))
}
}
return cri, nil

cri.chainProxyRouter = chainProxyRouter

return &cri, nil
}

type requirementSt struct {
Expand Down
Loading

0 comments on commit ca1828c

Please sign in to comment.