From 501d7da8d675862b1e8f9563bb63613a9b47dcbf Mon Sep 17 00:00:00 2001 From: Ran Mishael <106548467+ranlavanet@users.noreply.github.com> Date: Tue, 30 Apr 2024 19:43:33 +0200 Subject: [PATCH] fix: PRT - solving caching errors forever issue (#1388) * changing protobuf * compiling protobuf * adjusting chain fetcher * fixing chain router * chainlib interface adjustment * creating relayWrapper * fixing grpc test * fixing jsonrpc test * fixing rest tests * fixing grpc * fixing jsonrpc * fixing rest * fixing tendermint tests * fixing tendermint * propagating http requests status code * commenting todos * adding status code validation to rpc provider * adding node error validation to rpc consumer * fixing reliability manager * adding command * setting with limited ttl * adding server flags * space fix * fix lint * fix lint * fix pointer deref * checking nil * checking nil * checking nil 2 * checking nil 3 * checking nil 4 * checking nil 5 * checking nil 6 --- ecosystem/cache/command.go | 1 + ecosystem/cache/handlers.go | 10 +- ecosystem/cache/server.go | 15 ++- proto/lavanet/lava/pairing/relayCache.proto | 1 + protocol/chainlib/chain_fetcher.go | 23 ++-- protocol/chainlib/chain_router.go | 3 +- protocol/chainlib/chainlib.go | 4 +- .../chainlib/chainproxy/rpcclient/http.go | 1 + protocol/chainlib/common.go | 5 + protocol/chainlib/grpc.go | 55 +++++--- protocol/chainlib/grpc_test.go | 4 +- protocol/chainlib/jsonRPC.go | 20 ++- protocol/chainlib/jsonRPC_test.go | 6 +- protocol/chainlib/rest.go | 15 ++- protocol/chainlib/rest_test.go | 4 +- protocol/chainlib/tendermintRPC.go | 24 ++-- protocol/chainlib/tendermintRPC_test.go | 4 +- protocol/rpcconsumer/rpcconsumer_server.go | 6 +- .../reliabilitymanager/reliability_manager.go | 6 +- protocol/rpcprovider/rpcprovider_server.go | 23 +++- x/pairing/types/relayCache.pb.go | 123 ++++++++++++------ 21 files changed, 241 insertions(+), 112 deletions(-) diff --git a/ecosystem/cache/command.go b/ecosystem/cache/command.go index 3d1dbf640b..514e802210 100644 --- a/ecosystem/cache/command.go +++ b/ecosystem/cache/command.go @@ -42,6 +42,7 @@ longer DefaultExpirationForNonFinalized will reduce sync QoS for "latest" reques cacheCmd.Flags().String(FlagLogLevel, zerolog.InfoLevel.String(), "The logging level (trace|debug|info|warn|error|fatal|panic)") cacheCmd.Flags().Duration(ExpirationFlagName, DefaultExpirationTimeFinalized, "how long does a cache entry lasts in the cache for a finalized entry") cacheCmd.Flags().Duration(ExpirationNonFinalizedFlagName, DefaultExpirationForNonFinalized, "how long does a cache entry lasts in the cache for a non finalized entry") + cacheCmd.Flags().Duration(ExpirationNodeErrorsOnFinalizedFlagName, DefaultExpirationNodeErrors, "how long does a cache entry lasts in the cache for a finalized node error entry") cacheCmd.Flags().String(FlagMetricsAddress, DisabledFlagOption, "address to listen to prometheus metrics 127.0.0.1:5555, later you can curl http://127.0.0.1:5555/metrics") cacheCmd.Flags().Int64(FlagCacheSizeName, 2*1024*1024*1024, "the maximal amount of entries to save") return cacheCmd diff --git a/ecosystem/cache/handlers.go b/ecosystem/cache/handlers.go index 54c6e21de9..d3c6b34db2 100644 --- a/ecosystem/cache/handlers.go +++ b/ecosystem/cache/handlers.go @@ -263,11 +263,17 @@ func (s *RelayerCacheServer) SetRelay(ctx context.Context, relayCacheSet *pairin utils.Attribute{Key: "requested_block", Value: relayCacheSet.RequestedBlock}, utils.Attribute{Key: "response_data", Value: parser.CapStringLen(string(relayCacheSet.Response.Data))}, utils.Attribute{Key: "requestHash", Value: string(relayCacheSet.BlockHash)}, - utils.Attribute{Key: "latestKnownBlock", Value: string(relayCacheSet.BlockHash)}) + utils.Attribute{Key: "latestKnownBlock", Value: string(relayCacheSet.BlockHash)}, + utils.Attribute{Key: "IsNodeError", Value: relayCacheSet.IsNodeError}, + ) // finalized entries can stay there if relayCacheSet.Finalized { cache := s.CacheServer.finalizedCache - cache.SetWithTTL(cacheKey, cacheValue, cacheValue.Cost(), s.CacheServer.ExpirationFinalized) + if relayCacheSet.IsNodeError { + cache.SetWithTTL(cacheKey, cacheValue, cacheValue.Cost(), s.CacheServer.ExpirationNodeErrors) + } else { + cache.SetWithTTL(cacheKey, cacheValue, cacheValue.Cost(), s.CacheServer.ExpirationFinalized) + } } else { cache := s.CacheServer.tempCache cache.SetWithTTL(cacheKey, cacheValue, cacheValue.Cost(), s.getExpirationForChain(time.Duration(relayCacheSet.AverageBlockTime), relayCacheSet.BlockHash)) diff --git a/ecosystem/cache/server.go b/ecosystem/cache/server.go index 69bc0eb0a4..b7dd43b862 100644 --- a/ecosystem/cache/server.go +++ b/ecosystem/cache/server.go @@ -23,12 +23,14 @@ import ( ) const ( - ExpirationFlagName = "expiration" - ExpirationNonFinalizedFlagName = "expiration-non-finalized" - FlagCacheSizeName = "max-items" - DefaultExpirationForNonFinalized = 500 * time.Millisecond - DefaultExpirationTimeFinalized = time.Hour - CacheNumCounters = 100000000 // expect 10M items + ExpirationFlagName = "expiration" + ExpirationNonFinalizedFlagName = "expiration-non-finalized" + ExpirationNodeErrorsOnFinalizedFlagName = "expiration-finalized-node-errors" + FlagCacheSizeName = "max-items" + DefaultExpirationForNonFinalized = 500 * time.Millisecond + DefaultExpirationTimeFinalized = time.Hour + DefaultExpirationNodeErrors = 5 * time.Second + CacheNumCounters = 100000000 // expect 10M items ) type CacheServer struct { @@ -36,6 +38,7 @@ type CacheServer struct { tempCache *ristretto.Cache ExpirationFinalized time.Duration ExpirationNonFinalized time.Duration + ExpirationNodeErrors time.Duration CacheMetrics *CacheMetrics CacheMaxCost int64 } diff --git a/proto/lavanet/lava/pairing/relayCache.proto b/proto/lavanet/lava/pairing/relayCache.proto index 8ad1e37387..28ddd120ee 100644 --- a/proto/lavanet/lava/pairing/relayCache.proto +++ b/proto/lavanet/lava/pairing/relayCache.proto @@ -50,4 +50,5 @@ message RelayCacheSet { string chain_id = 9; // used to set latest block per chain. int64 seen_block = 10; int64 average_block_time = 11; + bool is_node_error = 12; // node errors wont be cached for long even if they are finalized in cases where it returns a valid response later on } \ No newline at end of file diff --git a/protocol/chainlib/chain_fetcher.go b/protocol/chainlib/chain_fetcher.go index 5397e7e3c5..1fd69a0e8a 100644 --- a/protocol/chainlib/chain_fetcher.go +++ b/protocol/chainlib/chain_fetcher.go @@ -169,8 +169,11 @@ func (cf *ChainFetcher) Verify(ctx context.Context, verification VerificationCon if err != nil { return utils.LavaFormatWarning("[-] verify failed sending chainMessage", err, []utils.Attribute{{Key: "chainID", Value: cf.endpoint.ChainID}, {Key: "APIInterface", Value: cf.endpoint.ApiInterface}}...) } + if reply == nil || reply.RelayReply == nil { + return utils.LavaFormatWarning("[-] verify failed sending chainMessage, reply or reply.RelayReply are nil", nil, []utils.Attribute{{Key: "chainID", Value: cf.endpoint.ChainID}, {Key: "APIInterface", Value: cf.endpoint.ApiInterface}}...) + } - parserInput, err := FormatResponseForParsing(reply, chainMessage) + parserInput, err := FormatResponseForParsing(reply.RelayReply, chainMessage) if err != nil { return utils.LavaFormatWarning("[-] verify failed to parse result", err, utils.LogAttr("chain_id", chainId), @@ -184,7 +187,7 @@ func (cf *ChainFetcher) Verify(ctx context.Context, verification VerificationCon {Key: "chainId", Value: chainId}, {Key: "nodeUrl", Value: proxyUrl.Url}, {Key: "Method", Value: parsing.GetApiName()}, - {Key: "Response", Value: string(reply.Data)}, + {Key: "Response", Value: string(reply.RelayReply.Data)}, }...) } if verification.LatestDistance != 0 && latestBlock != 0 && verification.ParseDirective.FunctionTag != spectypes.FUNCTION_TAG_GET_BLOCK_BY_NUM { @@ -194,7 +197,7 @@ func (cf *ChainFetcher) Verify(ctx context.Context, verification VerificationCon {Key: "chainId", Value: chainId}, {Key: "nodeUrl", Value: proxyUrl.Url}, {Key: "Method", Value: parsing.GetApiName()}, - {Key: "Response", Value: string(reply.Data)}, + {Key: "Response", Value: string(reply.RelayReply.Data)}, {Key: "parsedResult", Value: parsedResult}, }...) } @@ -271,13 +274,13 @@ func (cf *ChainFetcher) FetchLatestBlockNum(ctx context.Context) (int64, error) if err != nil { return spectypes.NOT_APPLICABLE, utils.LavaFormatDebug(tagName+" failed sending chainMessage", []utils.Attribute{{Key: "chainID", Value: cf.endpoint.ChainID}, {Key: "APIInterface", Value: cf.endpoint.ApiInterface}, {Key: "error", Value: err}}...) } - parserInput, err := FormatResponseForParsing(reply, chainMessage) + parserInput, err := FormatResponseForParsing(reply.RelayReply, chainMessage) if err != nil { return spectypes.NOT_APPLICABLE, utils.LavaFormatDebug(tagName+" Failed formatResponseForParsing", []utils.Attribute{ {Key: "chainId", Value: chainId}, {Key: "nodeUrl", Value: proxyUrl.Url}, {Key: "Method", Value: parsing.ApiName}, - {Key: "Response", Value: string(reply.Data)}, + {Key: "Response", Value: string(reply.RelayReply.Data)}, {Key: "error", Value: err}, }...) } @@ -287,7 +290,7 @@ func (cf *ChainFetcher) FetchLatestBlockNum(ctx context.Context) (int64, error) {Key: "chainId", Value: chainId}, {Key: "nodeUrl", Value: proxyUrl.Url}, {Key: "Method", Value: parsing.ApiName}, - {Key: "Response", Value: string(reply.Data)}, + {Key: "Response", Value: string(reply.RelayReply.Data)}, {Key: "error", Value: err}, }...) } @@ -331,14 +334,14 @@ func (cf *ChainFetcher) FetchBlockHashByNum(ctx context.Context, blockNum int64) timeTaken := time.Since(start) return "", utils.LavaFormatDebug(tagName+" failed sending chainMessage", []utils.Attribute{{Key: "sendTime", Value: timeTaken}, {Key: "error", Value: err}, {Key: "chainID", Value: cf.endpoint.ChainID}, {Key: "APIInterface", Value: cf.endpoint.ApiInterface}}...) } - parserInput, err := FormatResponseForParsing(reply, chainMessage) + parserInput, err := FormatResponseForParsing(reply.RelayReply, chainMessage) if err != nil { return "", utils.LavaFormatDebug(tagName+" Failed formatResponseForParsing", []utils.Attribute{ {Key: "error", Value: err}, {Key: "chainId", Value: chainId}, {Key: "nodeUrl", Value: proxyUrl.Url}, {Key: "Method", Value: parsing.ApiName}, - {Key: "Response", Value: string(reply.Data)}, + {Key: "Response", Value: string(reply.RelayReply.Data)}, }...) } @@ -349,14 +352,14 @@ func (cf *ChainFetcher) FetchBlockHashByNum(ctx context.Context, blockNum int64) {Key: "chainId", Value: chainId}, {Key: "nodeUrl", Value: proxyUrl.Url}, {Key: "Method", Value: parsing.ApiName}, - {Key: "Response", Value: string(reply.Data)}, + {Key: "Response", Value: string(reply.RelayReply.Data)}, }...) } _, _, blockDistanceToFinalization, _ := cf.chainParser.ChainBlockStats() latestBlock := atomic.LoadInt64(&cf.latestBlock) // assuming FetchLatestBlockNum is called before this one it's always true if latestBlock > 0 { finalized := spectypes.IsFinalizedBlock(blockNum, latestBlock, blockDistanceToFinalization) - cf.populateCache(cf.constructRelayData(collectionData.Type, path, data, blockNum, "", nil, latestBlock), reply, []byte(res), finalized) + cf.populateCache(cf.constructRelayData(collectionData.Type, path, data, blockNum, "", nil, latestBlock), reply.RelayReply, []byte(res), finalized) } return res, nil } diff --git a/protocol/chainlib/chain_router.go b/protocol/chainlib/chain_router.go index b375f0f69c..573ba5cbb3 100644 --- a/protocol/chainlib/chain_router.go +++ b/protocol/chainlib/chain_router.go @@ -8,7 +8,6 @@ import ( "github.com/lavanet/lava/protocol/common" "github.com/lavanet/lava/protocol/lavasession" "github.com/lavanet/lava/utils" - pairingtypes "github.com/lavanet/lava/x/pairing/types" ) type chainRouterEntry struct { @@ -57,7 +56,7 @@ func (cri chainRouterImpl) ExtensionsSupported(extensions []string) bool { return ok } -func (cri chainRouterImpl) SendNodeMsg(ctx context.Context, ch chan interface{}, chainMessage ChainMessageForSend, extensions []string) (relayReply *pairingtypes.RelayReply, subscriptionID string, relayReplyServer *rpcclient.ClientSubscription, proxyUrl common.NodeUrl, chainId string, err error) { +func (cri chainRouterImpl) SendNodeMsg(ctx context.Context, ch chan interface{}, chainMessage ChainMessageForSend, extensions []string) (relayReply *RelayReplyWrapper, subscriptionID string, relayReplyServer *rpcclient.ClientSubscription, proxyUrl common.NodeUrl, chainId string, err error) { // add the parsed addon from the apiCollection addon := chainMessage.GetApiCollection().CollectionData.AddOn selectedChainProxy, err := cri.getChainProxySupporting(addon, extensions) diff --git a/protocol/chainlib/chainlib.go b/protocol/chainlib/chainlib.go index 5be294391c..597b82e44f 100644 --- a/protocol/chainlib/chainlib.go +++ b/protocol/chainlib/chainlib.go @@ -113,13 +113,13 @@ type ChainListener interface { } type ChainRouter interface { - SendNodeMsg(ctx context.Context, ch chan interface{}, chainMessage ChainMessageForSend, extensions []string) (relayReply *pairingtypes.RelayReply, subscriptionID string, relayReplyServer *rpcclient.ClientSubscription, proxyUrl common.NodeUrl, chainId string, err error) // has to be thread safe, reuse code within ParseMsg as common functionality + SendNodeMsg(ctx context.Context, ch chan interface{}, chainMessage ChainMessageForSend, extensions []string) (relayReply *RelayReplyWrapper, subscriptionID string, relayReplyServer *rpcclient.ClientSubscription, proxyUrl common.NodeUrl, chainId string, err error) // has to be thread safe, reuse code within ParseMsg as common functionality ExtensionsSupported([]string) bool } type ChainProxy interface { GetChainProxyInformation() (common.NodeUrl, string) - SendNodeMsg(ctx context.Context, ch chan interface{}, chainMessage ChainMessageForSend) (relayReply *pairingtypes.RelayReply, subscriptionID string, relayReplyServer *rpcclient.ClientSubscription, err error) // has to be thread safe, reuse code within ParseMsg as common functionality + SendNodeMsg(ctx context.Context, ch chan interface{}, chainMessage ChainMessageForSend) (relayReply *RelayReplyWrapper, subscriptionID string, relayReplyServer *rpcclient.ClientSubscription, err error) // has to be thread safe, reuse code within ParseMsg as common functionality } func GetChainRouter(ctx context.Context, nConns uint, rpcProviderEndpoint *lavasession.RPCProviderEndpoint, chainParser ChainParser) (ChainRouter, error) { diff --git a/protocol/chainlib/chainproxy/rpcclient/http.go b/protocol/chainlib/chainproxy/rpcclient/http.go index 87640cd6f0..1d5c230730 100755 --- a/protocol/chainlib/chainproxy/rpcclient/http.go +++ b/protocol/chainlib/chainproxy/rpcclient/http.go @@ -203,6 +203,7 @@ func (hc *httpConn) doRequest(ctx context.Context, msg interface{}, isJsonRPC bo resp, err := hc.client.Do(req) if resp != nil { // resp can be non nil on error + metadata.AppendToOutgoingContext(ctx, common.StatusCodeMetadataKey, strconv.Itoa(resp.StatusCode)) trailer := metadata.Pairs(common.StatusCodeMetadataKey, strconv.Itoa(resp.StatusCode)) grpc.SetTrailer(ctx, trailer) // we ignore this error here since this code can be triggered not from grpc } diff --git a/protocol/chainlib/common.go b/protocol/chainlib/common.go index 8f1aa7cfbf..1d10200eee 100644 --- a/protocol/chainlib/common.go +++ b/protocol/chainlib/common.go @@ -32,6 +32,11 @@ const ( var InvalidResponses = []string{"null", "", "nil", "undefined"} +type RelayReplyWrapper struct { + StatusCode int + RelayReply *pairingtypes.RelayReply +} + type VerificationKey struct { Extension string Addon string diff --git a/protocol/chainlib/grpc.go b/protocol/chainlib/grpc.go index 109c341cd1..b75877d4e0 100644 --- a/protocol/chainlib/grpc.go +++ b/protocol/chainlib/grpc.go @@ -42,6 +42,8 @@ import ( "google.golang.org/grpc/status" ) +const GRPCStatusCodeOnFailedMessages = 32 + type GrpcNodeErrorResponse struct { ErrorMessage string `json:"error_message"` ErrorCode uint32 `json:"error_code"` @@ -427,7 +429,7 @@ func newGrpcChainProxy(ctx context.Context, averageBlockTime time.Duration, pars return cp, nil } -func (cp *GrpcChainProxy) SendNodeMsg(ctx context.Context, ch chan interface{}, chainMessage ChainMessageForSend) (relayReply *pairingtypes.RelayReply, subscriptionID string, relayReplyServer *rpcclient.ClientSubscription, err error) { +func (cp *GrpcChainProxy) SendNodeMsg(ctx context.Context, ch chan interface{}, chainMessage ChainMessageForSend) (relayReply *RelayReplyWrapper, subscriptionID string, relayReplyServer *rpcclient.ClientSubscription, err error) { if ch != nil { return nil, "", nil, utils.LavaFormatError("Subscribe is not allowed on grpc", nil, utils.Attribute{Key: "GUID", Value: ctx}) } @@ -531,6 +533,21 @@ func (cp *GrpcChainProxy) SendNodeMsg(ctx context.Context, ch chan interface{}, connectCtx, cancel := cp.CapTimeoutForSend(ctx, chainMessage) defer cancel() err = conn.Invoke(connectCtx, "/"+nodeMessage.Path, msg, response, grpc.Header(&respHeaders)) + // Extract status code from response headers + statusCodeHeader := respHeaders.Get("grpc-status") + if len(statusCodeHeader) > 0 { + statusCodeTest, err := strconv.Atoi(statusCodeHeader[0]) + if err != nil { + // Handle error + utils.LavaFormatError("Error:", err, utils.LogAttr("statusCode", statusCodeTest)) + } else { + // Use the status code + utils.LavaFormatDebug("Status Code:", utils.LogAttr("statusCode", statusCodeTest)) + } + } else { + utils.LavaFormatDebug("NO Status Code:") + // No status code found in response headers + } if err != nil { // Validate if the error is related to the provider connection to the node or it is a valid error // in case the error is valid (e.g. bad input parameters) the error will return in the form of a valid error reply @@ -538,13 +555,16 @@ func (cp *GrpcChainProxy) SendNodeMsg(ctx context.Context, ch chan interface{}, return nil, "", nil, parsedError } // return the node's error back to the client as the error type is a invalid request which is cu deductible - respBytes, handlingError := parseGrpcNodeErrorToReply(ctx, err) + respBytes, statusCode, handlingError := parseGrpcNodeErrorToReply(ctx, err) if handlingError != nil { return nil, "", nil, handlingError } - reply := &pairingtypes.RelayReply{ - Data: respBytes, - Metadata: convertToMetadataMapOfSlices(respHeaders), + reply := &RelayReplyWrapper{ + StatusCode: int(statusCode), + RelayReply: &pairingtypes.RelayReply{ + Data: respBytes, + Metadata: convertToMetadataMapOfSlices(respHeaders), + }, } return reply, "", nil, nil } @@ -554,10 +574,12 @@ func (cp *GrpcChainProxy) SendNodeMsg(ctx context.Context, ch chan interface{}, if err != nil { return nil, "", nil, utils.LavaFormatError("proto.Marshal(response) Failed", err, utils.Attribute{Key: "GUID", Value: ctx}) } - - reply := &pairingtypes.RelayReply{ - Data: respBytes, - Metadata: convertToMetadataMapOfSlices(respHeaders), + reply := &RelayReplyWrapper{ + StatusCode: http.StatusOK, // status code is used only for rest at the moment + RelayReply: &pairingtypes.RelayReply{ + Data: respBytes, + Metadata: convertToMetadataMapOfSlices(respHeaders), + }, } return reply, "", nil, nil } @@ -565,21 +587,24 @@ func (cp *GrpcChainProxy) SendNodeMsg(ctx context.Context, ch chan interface{}, // This method assumes that the error is due to misuse of the request arguments, meaning the user would like to get // the response from the server to fix the request arguments. this method will make sure the user will get the response // from the node in the same format as expected. -func parseGrpcNodeErrorToReply(ctx context.Context, err error) ([]byte, error) { +func parseGrpcNodeErrorToReply(ctx context.Context, err error) ([]byte, uint32, error) { var respBytes []byte var marshalingError error + var errorCode uint32 = GRPCStatusCodeOnFailedMessages + // try fetching status code from error or otherwise use the GRPCStatusCodeOnFailedMessages if statusError, ok := status.FromError(err); ok { - respBytes, marshalingError = json.Marshal(&GrpcNodeErrorResponse{ErrorMessage: statusError.Message(), ErrorCode: uint32(statusError.Code())}) + errorCode = uint32(statusError.Code()) + respBytes, marshalingError = json.Marshal(&GrpcNodeErrorResponse{ErrorMessage: statusError.Message(), ErrorCode: errorCode}) if marshalingError != nil { - return nil, utils.LavaFormatError("json.Marshal(&GrpcNodeErrorResponse Failed 1", err, utils.Attribute{Key: "GUID", Value: ctx}) + return nil, errorCode, utils.LavaFormatError("json.Marshal(&GrpcNodeErrorResponse Failed 1", err, utils.Attribute{Key: "GUID", Value: ctx}) } } else { - respBytes, marshalingError = json.Marshal(&GrpcNodeErrorResponse{ErrorMessage: err.Error(), ErrorCode: uint32(32)}) + respBytes, marshalingError = json.Marshal(&GrpcNodeErrorResponse{ErrorMessage: err.Error(), ErrorCode: errorCode}) if marshalingError != nil { - return nil, utils.LavaFormatError("json.Marshal(&GrpcNodeErrorResponse Failed 2", err, utils.Attribute{Key: "GUID", Value: ctx}) + return nil, errorCode, utils.LavaFormatError("json.Marshal(&GrpcNodeErrorResponse Failed 2", err, utils.Attribute{Key: "GUID", Value: ctx}) } } - return respBytes, nil + return respBytes, errorCode, nil } func marshalJSON(msg proto.Message) ([]byte, error) { diff --git a/protocol/chainlib/grpc_test.go b/protocol/chainlib/grpc_test.go index d5404435c7..23f770c1fe 100644 --- a/protocol/chainlib/grpc_test.go +++ b/protocol/chainlib/grpc_test.go @@ -214,7 +214,7 @@ func TestParsingRequestedBlocksHeadersGrpc(t *testing.T) { require.Equal(t, test.requestedBlock, requestedBlock) reply, _, _, _, _, err := chainRouter.SendNodeMsg(ctx, nil, chainMessage, nil) require.NoError(t, err) - parserInput, err := FormatResponseForParsing(reply, chainMessage) + parserInput, err := FormatResponseForParsing(reply.RelayReply, chainMessage) require.NoError(t, err) blockNum, err := parser.ParseBlockFromReply(parserInput, parsingForCrafting.ResultParsing) require.NoError(t, err) @@ -283,7 +283,7 @@ func TestSettingBlocksHeadersGrpc(t *testing.T) { require.Equal(t, test.block, requestedBlock) reply, _, _, _, _, err := chainRouter.SendNodeMsg(ctx, nil, chainMessage, nil) require.NoError(t, err) - parserInput, err := FormatResponseForParsing(reply, chainMessage) + parserInput, err := FormatResponseForParsing(reply.RelayReply, chainMessage) require.NoError(t, err) blockNum, err := parser.ParseBlockFromReply(parserInput, parsingForCrafting.ResultParsing) require.NoError(t, err) diff --git a/protocol/chainlib/jsonRPC.go b/protocol/chainlib/jsonRPC.go index d5fbe9dabb..dcdd78d099 100644 --- a/protocol/chainlib/jsonRPC.go +++ b/protocol/chainlib/jsonRPC.go @@ -593,7 +593,7 @@ func (cp *JrpcChainProxy) start(ctx context.Context, nConns uint, nodeUrl common return nil } -func (cp *JrpcChainProxy) sendBatchMessage(ctx context.Context, nodeMessage *rpcInterfaceMessages.JsonrpcBatchMessage, chainMessage ChainMessageForSend) (relayReply *pairingtypes.RelayReply, err error) { +func (cp *JrpcChainProxy) sendBatchMessage(ctx context.Context, nodeMessage *rpcInterfaceMessages.JsonrpcBatchMessage, chainMessage ChainMessageForSend) (relayReply *RelayReplyWrapper, err error) { internalPath := chainMessage.GetApiCollection().CollectionData.InternalPath rpc, err := cp.conn[internalPath].GetRpc(ctx, true) if err != nil { @@ -634,13 +634,17 @@ func (cp *JrpcChainProxy) sendBatchMessage(ctx context.Context, nodeMessage *rpc if err != nil { return nil, err } - reply := &pairingtypes.RelayReply{ - Data: retData, + reply := &RelayReplyWrapper{ + StatusCode: http.StatusOK, // status code is used only for rest at the moment + + RelayReply: &pairingtypes.RelayReply{ + Data: retData, + }, } return reply, nil } -func (cp *JrpcChainProxy) SendNodeMsg(ctx context.Context, ch chan interface{}, chainMessage ChainMessageForSend) (relayReply *pairingtypes.RelayReply, subscriptionID string, relayReplyServer *rpcclient.ClientSubscription, err error) { +func (cp *JrpcChainProxy) SendNodeMsg(ctx context.Context, ch chan interface{}, chainMessage ChainMessageForSend) (relayReply *RelayReplyWrapper, subscriptionID string, relayReplyServer *rpcclient.ClientSubscription, err error) { // Get node rpcInputMessage := chainMessage.GetRPCMessage() @@ -732,8 +736,12 @@ func (cp *JrpcChainProxy) SendNodeMsg(ctx context.Context, ch chan interface{}, return nil, "", nil, err } - reply := &pairingtypes.RelayReply{ - Data: retData, + reply := &RelayReplyWrapper{ + StatusCode: http.StatusOK, // status code is used only for rest at the moment + + RelayReply: &pairingtypes.RelayReply{ + Data: retData, + }, } if ch != nil { diff --git a/protocol/chainlib/jsonRPC_test.go b/protocol/chainlib/jsonRPC_test.go index b8556a3ce3..850aa78b08 100644 --- a/protocol/chainlib/jsonRPC_test.go +++ b/protocol/chainlib/jsonRPC_test.go @@ -180,7 +180,7 @@ func TestAddonAndVerifications(t *testing.T) { require.NoError(t, err) reply, _, _, _, _, err := chainRouter.SendNodeMsg(ctx, nil, chainMessage, []string{verification.Extension}) require.NoError(t, err) - _, err = FormatResponseForParsing(reply, chainMessage) + _, err = FormatResponseForParsing(reply.RelayReply, chainMessage) require.NoError(t, err) } if closeServer != nil { @@ -293,7 +293,7 @@ func TestJsonRpcBatchCall(t *testing.T) { require.True(t, gotCalled) require.NoError(t, err) require.NotNil(t, relayReply) - require.Equal(t, response, string(relayReply.Data)) + require.Equal(t, response, string(relayReply.RelayReply.Data)) defer func() { if closeServer != nil { closeServer() @@ -334,7 +334,7 @@ func TestJsonRpcBatchCallSameID(t *testing.T) { require.True(t, gotCalled) require.NoError(t, err) require.NotNil(t, relayReply) - require.Equal(t, responseExpected, string(relayReply.Data)) + require.Equal(t, responseExpected, string(relayReply.RelayReply.Data)) defer func() { if closeServer != nil { closeServer() diff --git a/protocol/chainlib/rest.go b/protocol/chainlib/rest.go index c74897fae3..900bcef87f 100644 --- a/protocol/chainlib/rest.go +++ b/protocol/chainlib/rest.go @@ -454,7 +454,7 @@ func NewRestChainProxy(ctx context.Context, nConns uint, rpcProviderEndpoint lav return rcp, nil } -func (rcp *RestChainProxy) SendNodeMsg(ctx context.Context, ch chan interface{}, chainMessage ChainMessageForSend) (relayReply *pairingtypes.RelayReply, subscriptionID string, relayReplyServer *rpcclient.ClientSubscription, err error) { +func (rcp *RestChainProxy) SendNodeMsg(ctx context.Context, ch chan interface{}, chainMessage ChainMessageForSend) (relayReply *RelayReplyWrapper, subscriptionID string, relayReplyServer *rpcclient.ClientSubscription, err error) { if ch != nil { return nil, "", nil, utils.LavaFormatError("Subscribe is not allowed on rest", nil) } @@ -538,15 +538,18 @@ func (rcp *RestChainProxy) SendNodeMsg(ctx context.Context, ch chan interface{}, return nil, "", nil, err } - reply := &pairingtypes.RelayReply{ - Data: body, - Metadata: convertToMetadataMapOfSlices(res.Header), + reply := &RelayReplyWrapper{ + StatusCode: res.StatusCode, + RelayReply: &pairingtypes.RelayReply{ + Data: body, + Metadata: convertToMetadataMapOfSlices(res.Header), + }, } // checking if rest reply data is in json format - err = rcp.HandleJSONFormatError(reply.Data) + err = rcp.HandleJSONFormatError(reply.RelayReply.Data) if err != nil { - return nil, "", nil, utils.LavaFormatError("Rest reply is neither a JSON object nor a JSON array of objects", nil, utils.Attribute{Key: "reply.Data", Value: string(reply.Data)}) + return nil, "", nil, utils.LavaFormatError("Rest reply is neither a JSON object nor a JSON array of objects", nil, utils.Attribute{Key: "reply.Data", Value: string(reply.RelayReply.Data)}) } return reply, "", nil, nil diff --git a/protocol/chainlib/rest_test.go b/protocol/chainlib/rest_test.go index 73bac82797..e3d435af70 100644 --- a/protocol/chainlib/rest_test.go +++ b/protocol/chainlib/rest_test.go @@ -212,7 +212,7 @@ func TestParsingRequestedBlocksHeadersRest(t *testing.T) { require.Equal(t, test.requestedBlock, latestReqBlock) reply, _, _, _, _, err := chainRouter.SendNodeMsg(ctx, nil, chainMessage, nil) require.NoError(t, err) - parserInput, err := FormatResponseForParsing(reply, chainMessage) + parserInput, err := FormatResponseForParsing(reply.RelayReply, chainMessage) require.NoError(t, err) blockNum, err := parser.ParseBlockFromReply(parserInput, parsingForCrafting.ResultParsing) require.NoError(t, err) @@ -286,7 +286,7 @@ func TestSettingRequestedBlocksHeadersRest(t *testing.T) { require.Equal(t, test.block, latestReqBlock) // expected behavior is that it doesn't change the original requested block reply, _, _, _, _, err := chainRouter.SendNodeMsg(ctx, nil, chainMessage, nil) require.NoError(t, err) - parserInput, err := FormatResponseForParsing(reply, chainMessage) + parserInput, err := FormatResponseForParsing(reply.RelayReply, chainMessage) require.NoError(t, err) blockNum, err := parser.ParseBlockFromReply(parserInput, parsingForCrafting.ResultParsing) require.NoError(t, err) diff --git a/protocol/chainlib/tendermintRPC.go b/protocol/chainlib/tendermintRPC.go index 3cac7517d5..6e3de70824 100644 --- a/protocol/chainlib/tendermintRPC.go +++ b/protocol/chainlib/tendermintRPC.go @@ -641,7 +641,7 @@ func (cp *tendermintRpcChainProxy) addHttpConnector(ctx context.Context, nConns return nil } -func (cp *tendermintRpcChainProxy) SendNodeMsg(ctx context.Context, ch chan interface{}, chainMessage ChainMessageForSend) (relayReply *pairingtypes.RelayReply, subscriptionID string, relayReplyServer *rpcclient.ClientSubscription, err error) { +func (cp *tendermintRpcChainProxy) SendNodeMsg(ctx context.Context, ch chan interface{}, chainMessage ChainMessageForSend) (relayReply *RelayReplyWrapper, subscriptionID string, relayReplyServer *rpcclient.ClientSubscription, err error) { rpcInputMessage := chainMessage.GetRPCMessage() nodeMessage, ok := rpcInputMessage.(*rpcInterfaceMessages.TendermintrpcMessage) if !ok { @@ -663,7 +663,7 @@ func (cp *tendermintRpcChainProxy) SendNodeMsg(ctx context.Context, ch chan inte return cp.SendRPC(ctx, nodeMessage, ch, chainMessage) } -func (cp *tendermintRpcChainProxy) SendURI(ctx context.Context, nodeMessage *rpcInterfaceMessages.TendermintrpcMessage, ch chan interface{}, chainMessage ChainMessageForSend) (relayReply *pairingtypes.RelayReply, subscriptionID string, relayReplyServer *rpcclient.ClientSubscription, err error) { +func (cp *tendermintRpcChainProxy) SendURI(ctx context.Context, nodeMessage *rpcInterfaceMessages.TendermintrpcMessage, ch chan interface{}, chainMessage ChainMessageForSend) (relayReply *RelayReplyWrapper, subscriptionID string, relayReplyServer *rpcclient.ClientSubscription, err error) { // check if the input channel is not nil if ch != nil { // return an error if the channel is not nil @@ -731,21 +731,24 @@ func (cp *tendermintRpcChainProxy) SendURI(ctx context.Context, nodeMessage *rpc } // create a new relay reply struct with the response body as the data - reply := &pairingtypes.RelayReply{ - Data: body, + reply := &RelayReplyWrapper{ + StatusCode: http.StatusOK, // status code is used only for rest at the moment + RelayReply: &pairingtypes.RelayReply{ + Data: body, + }, } // checking if rest reply data is in json format - err = cp.HandleJSONFormatError(reply.Data) + err = cp.HandleJSONFormatError(reply.RelayReply.Data) if err != nil { - return nil, "", nil, utils.LavaFormatError("Tendermint reply is neither a JSON object nor a JSON array of objects", nil, utils.Attribute{Key: "reply.Data", Value: string(reply.Data)}) + return nil, "", nil, utils.LavaFormatError("Tendermint reply is neither a JSON object nor a JSON array of objects", nil, utils.Attribute{Key: "reply.Data", Value: string(reply.RelayReply.Data)}) } return reply, "", nil, nil } // SendRPC sends Tendermint HTTP or WebSockets call -func (cp *tendermintRpcChainProxy) SendRPC(ctx context.Context, nodeMessage *rpcInterfaceMessages.TendermintrpcMessage, ch chan interface{}, chainMessage ChainMessageForSend) (relayReply *pairingtypes.RelayReply, subscriptionID string, relayReplyServer *rpcclient.ClientSubscription, err error) { +func (cp *tendermintRpcChainProxy) SendRPC(ctx context.Context, nodeMessage *rpcInterfaceMessages.TendermintrpcMessage, ch chan interface{}, chainMessage ChainMessageForSend) (relayReply *RelayReplyWrapper, subscriptionID string, relayReplyServer *rpcclient.ClientSubscription, err error) { // Get rpc connection from the connection pool var rpc *rpcclient.Client if ch != nil { @@ -837,8 +840,11 @@ func (cp *tendermintRpcChainProxy) SendRPC(ctx context.Context, nodeMessage *rpc } // create a new relay reply struct - reply := &pairingtypes.RelayReply{ - Data: data, + reply := &RelayReplyWrapper{ + StatusCode: http.StatusOK, // status code is used only for rest at the moment + RelayReply: &pairingtypes.RelayReply{ + Data: data, + }, } if ch != nil { diff --git a/protocol/chainlib/tendermintRPC_test.go b/protocol/chainlib/tendermintRPC_test.go index a48752b4bc..aaa24f0847 100644 --- a/protocol/chainlib/tendermintRPC_test.go +++ b/protocol/chainlib/tendermintRPC_test.go @@ -195,7 +195,7 @@ func TestTendermintRpcBatchCall(t *testing.T) { require.True(t, gotCalled) require.NoError(t, err) require.NotNil(t, relayReply) - require.Equal(t, response, string(relayReply.Data)) + require.Equal(t, response, string(relayReply.RelayReply.Data)) defer func() { if closeServer != nil { closeServer() @@ -237,7 +237,7 @@ func TestTendermintRpcBatchCallWithSameID(t *testing.T) { require.True(t, gotCalled) require.NoError(t, err) require.NotNil(t, relayReply) - require.Equal(t, response, string(relayReply.Data)) + require.Equal(t, response, string(relayReply.RelayReply.Data)) defer func() { if closeServer != nil { closeServer() diff --git a/protocol/rpcconsumer/rpcconsumer_server.go b/protocol/rpcconsumer/rpcconsumer_server.go index 3a23695de9..7851fd4b4a 100644 --- a/protocol/rpcconsumer/rpcconsumer_server.go +++ b/protocol/rpcconsumer/rpcconsumer_server.go @@ -685,7 +685,7 @@ func (rpccs *RPCConsumerServer) sendRelayToProvider( copyReply := &pairingtypes.RelayReply{} copyReplyErr := protocopy.DeepCopyProtoObject(localRelayResult.Reply, copyReply) // set cache in a non blocking call - + statusCode := localRelayResult.StatusCode requestedBlock := localRelayResult.Request.RelayData.RequestBlock // get requested block before removing it from the data seenBlock := localRelayResult.Request.RelayData.SeenBlock // get seen block before removing it from the data hashKey, _, hashErr := chainlib.HashCacheRequest(localRelayResult.Request.RelayData, chainId) // get the hash (this changes the data) @@ -708,6 +708,9 @@ func (rpccs *RPCConsumerServer) sendRelayToProvider( new_ctx, cancel := context.WithTimeout(new_ctx, common.DataReliabilityTimeoutIncrease) defer cancel() _, averageBlockTime, _, _ := rpccs.chainParser.ChainBlockStats() + // we don't want to cache node errors for too long. what can happen is a finalized block gets an error + // and we cache it for a long period of time. + isNodeError, _ := chainMessage.CheckResponseError(copyReply.Data, statusCode) err2 := rpccs.cache.SetEntry(new_ctx, &pairingtypes.RelayCacheSet{ RequestHash: hashKey, @@ -720,6 +723,7 @@ func (rpccs *RPCConsumerServer) sendRelayToProvider( OptionalMetadata: nil, SharedStateId: sharedStateId, AverageBlockTime: int64(averageBlockTime), // by using average block time we can set longer TTL + IsNodeError: isNodeError, }) if err2 != nil { utils.LavaFormatWarning("error updating cache with new entry", err2) diff --git a/protocol/rpcprovider/reliabilitymanager/reliability_manager.go b/protocol/rpcprovider/reliabilitymanager/reliability_manager.go index 48e302cfde..9ecad9fbde 100644 --- a/protocol/rpcprovider/reliabilitymanager/reliability_manager.go +++ b/protocol/rpcprovider/reliabilitymanager/reliability_manager.go @@ -111,11 +111,15 @@ func (rm *ReliabilityManager) VoteHandler(voteParams *VoteParams, nodeHeight uin utils.Attribute{Key: "voteID", Value: voteID}, utils.Attribute{Key: "chainID", Value: voteParams.ChainID}) } // TODO: get extensions and addons from the request - reply, _, _, _, _, err := rm.chainRouter.SendNodeMsg(ctx, nil, chainMessage, nil) + replyWrapper, _, _, _, _, err := rm.chainRouter.SendNodeMsg(ctx, nil, chainMessage, nil) if err != nil { return utils.LavaFormatError("vote relay send has failed", err, utils.Attribute{Key: "ApiURL", Value: voteParams.ApiURL}, utils.Attribute{Key: "RequestData", Value: voteParams.RequestData}) } + if replyWrapper == nil || replyWrapper.RelayReply == nil { + return utils.LavaFormatError("vote relay send has failed, relayWrapper is nil", nil, utils.Attribute{Key: "ApiURL", Value: voteParams.ApiURL}, utils.Attribute{Key: "RequestData", Value: voteParams.RequestData}) + } + reply := replyWrapper.RelayReply reply.Metadata, _, _ = rm.chainParser.HandleHeaders(reply.Metadata, chainMessage.GetApiCollection(), spectypes.Header_pass_reply) nonce := rand.Int63() relayData := BuildRelayDataFromVoteParams(voteParams) diff --git a/protocol/rpcprovider/rpcprovider_server.go b/protocol/rpcprovider/rpcprovider_server.go index f4f9c396b3..105da03f0d 100644 --- a/protocol/rpcprovider/rpcprovider_server.go +++ b/protocol/rpcprovider/rpcprovider_server.go @@ -378,10 +378,14 @@ func (rpcps *RPCProviderServer) TryRelaySubscribe(ctx context.Context, requestBl var clientSub *rpcclient.ClientSubscription var subscriptionID string subscribeRepliesChan := make(chan interface{}) - reply, subscriptionID, clientSub, _, _, err := rpcps.chainRouter.SendNodeMsg(ctx, subscribeRepliesChan, chainMessage, nil) + replyWrapper, subscriptionID, clientSub, _, _, err := rpcps.chainRouter.SendNodeMsg(ctx, subscribeRepliesChan, chainMessage, nil) if err != nil { return false, utils.LavaFormatError("Subscription failed", err, utils.Attribute{Key: "GUID", Value: ctx}) } + if replyWrapper == nil || replyWrapper.RelayReply == nil { + return false, utils.LavaFormatError("Subscription failed, relayWrapper or RelayReply are nil", nil, utils.Attribute{Key: "GUID", Value: ctx}) + } + reply = replyWrapper.RelayReply reply.Metadata, _, _ = rpcps.chainParser.HandleHeaders(reply.Metadata, chainMessage.GetApiCollection(), spectypes.Header_pass_reply) if clientSub == nil { // failed subscription, but not an error. (probably a node error) @@ -759,11 +763,16 @@ func (rpcps *RPCProviderServer) TryRelay(ctx context.Context, request *pairingty if debugConsistency { utils.LavaFormatDebug("adding stickiness header", utils.LogAttr("tokenFromContext", common.GetTokenFromGrpcContext(ctx)), utils.LogAttr("unique_token", common.GetUniqueToken(consumerAddr.String(), common.GetIpFromGrpcContext(ctx)))) } - - reply, _, _, _, _, err = rpcps.chainRouter.SendNodeMsg(ctx, nil, chainMsg, request.RelayData.Extensions) + var replyWrapper *chainlib.RelayReplyWrapper + replyWrapper, _, _, _, _, err = rpcps.chainRouter.SendNodeMsg(ctx, nil, chainMsg, request.RelayData.Extensions) if err != nil { return nil, utils.LavaFormatError("Sending chainMsg failed", err, utils.Attribute{Key: "GUID", Value: ctx}, utils.Attribute{Key: "specID", Value: rpcps.rpcProviderEndpoint.ChainID}) } + if replyWrapper == nil || replyWrapper.RelayReply == nil { + return nil, utils.LavaFormatError("Relay Wrapper returned nil without an error", nil, utils.Attribute{Key: "GUID", Value: ctx}, utils.Attribute{Key: "specID", Value: rpcps.rpcProviderEndpoint.ChainID}) + } + + reply = replyWrapper.RelayReply if debugLatency { utils.LavaFormatDebug("node reply received", utils.Attribute{Key: "timeTaken", Value: time.Since(sendTime)}, utils.Attribute{Key: "GUID", Value: ctx}, utils.Attribute{Key: "specID", Value: rpcps.rpcProviderEndpoint.ChainID}) } @@ -775,6 +784,9 @@ func (rpcps *RPCProviderServer) TryRelay(ctx context.Context, request *pairingty hashKey, _, hashErr := chainlib.HashCacheRequest(request.RelayData, rpcps.rpcProviderEndpoint.ChainID) // get the hash (this changes the data) copyReply := &pairingtypes.RelayReply{} copyReplyErr := protocopy.DeepCopyProtoObject(reply, copyReply) + + // get status code to decide if its a node error + statusCode := replyWrapper.StatusCode go func() { if hashErr != nil || copyReplyErr != nil { utils.LavaFormatError("Failed copying relay private data on TryRelay", nil, utils.LogAttr("copyReplyErr", copyReplyErr), utils.LogAttr("hashErr", hashErr)) @@ -787,6 +799,10 @@ func (rpcps *RPCProviderServer) TryRelay(ctx context.Context, request *pairingty utils.LavaFormatError("TryRelay failed calculating hash for cach.SetEntry", err) return } + // in case the error is a node error we don't want to cache the response for a long period of time + // so users wont get errors if the error was temporary + isNodeError, _ := chainMsg.CheckResponseError(copyReply.Data, statusCode) + err = cache.SetEntry(new_ctx, &pairingtypes.RelayCacheSet{ RequestHash: hashKey, RequestedBlock: requestedBlock, @@ -797,6 +813,7 @@ func (rpcps *RPCProviderServer) TryRelay(ctx context.Context, request *pairingty OptionalMetadata: ignoredMetadata, AverageBlockTime: int64(averageBlockTime), SeenBlock: latestBlock, + IsNodeError: isNodeError, }) if err != nil && request.RelaySession.Epoch != spectypes.NOT_APPLICABLE { utils.LavaFormatWarning("error updating cache with new entry", err, utils.Attribute{Key: "GUID", Value: ctx}) diff --git a/x/pairing/types/relayCache.pb.go b/x/pairing/types/relayCache.pb.go index 35665e71bf..9c5ac5d51b 100644 --- a/x/pairing/types/relayCache.pb.go +++ b/x/pairing/types/relayCache.pb.go @@ -297,6 +297,7 @@ type RelayCacheSet struct { ChainId string `protobuf:"bytes,9,opt,name=chain_id,json=chainId,proto3" json:"chain_id,omitempty"` SeenBlock int64 `protobuf:"varint,10,opt,name=seen_block,json=seenBlock,proto3" json:"seen_block,omitempty"` AverageBlockTime int64 `protobuf:"varint,11,opt,name=average_block_time,json=averageBlockTime,proto3" json:"average_block_time,omitempty"` + IsNodeError bool `protobuf:"varint,12,opt,name=is_node_error,json=isNodeError,proto3" json:"is_node_error,omitempty"` } func (m *RelayCacheSet) Reset() { *m = RelayCacheSet{} } @@ -402,6 +403,13 @@ func (m *RelayCacheSet) GetAverageBlockTime() int64 { return 0 } +func (m *RelayCacheSet) GetIsNodeError() bool { + if m != nil { + return m.IsNodeError + } + return false +} + func init() { proto.RegisterType((*CacheRelayReply)(nil), "lavanet.lava.pairing.CacheRelayReply") proto.RegisterType((*CacheUsage)(nil), "lavanet.lava.pairing.CacheUsage") @@ -415,47 +423,49 @@ func init() { } var fileDescriptor_36fbab536e2bbad1 = []byte{ - // 638 bytes of a gzipped FileDescriptorProto + // 662 bytes of a gzipped FileDescriptorProto 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xac, 0x54, 0xcd, 0x6e, 0xd3, 0x40, - 0x10, 0xb6, 0x9b, 0x34, 0x3f, 0x93, 0x96, 0x96, 0x55, 0x85, 0x42, 0x68, 0x8d, 0x31, 0xea, 0xcf, - 0x01, 0xd9, 0x52, 0x91, 0x38, 0x71, 0x80, 0x52, 0xd4, 0x56, 0xa2, 0x12, 0x38, 0x20, 0x21, 0x2e, - 0xd1, 0x26, 0x9e, 0xda, 0x2b, 0x1c, 0xdb, 0x78, 0xb7, 0x15, 0xe5, 0x29, 0x78, 0x1d, 0xc4, 0x0b, - 0xf4, 0xd8, 0x23, 0x27, 0x84, 0xda, 0xa7, 0x80, 0x13, 0xf2, 0xc4, 0x49, 0x93, 0xc8, 0x8d, 0x2a, - 0xc1, 0xc9, 0xde, 0x6f, 0xbf, 0xd9, 0x99, 0xf9, 0xbe, 0xd1, 0xc0, 0x7a, 0xc8, 0x4f, 0x78, 0x84, - 0xca, 0xc9, 0xbe, 0x4e, 0xc2, 0x45, 0x2a, 0x22, 0xdf, 0x49, 0x31, 0xe4, 0xa7, 0x2f, 0x78, 0x2f, - 0x40, 0x3b, 0x49, 0x63, 0x15, 0xb3, 0x95, 0x9c, 0x66, 0x67, 0x5f, 0x3b, 0xa7, 0xb5, 0x56, 0xfc, - 0xd8, 0x8f, 0x89, 0xe0, 0x64, 0x7f, 0x03, 0x6e, 0xcb, 0xbc, 0xfe, 0xc9, 0x9c, 0x71, 0xcf, 0x8f, - 0x63, 0x3f, 0x44, 0x87, 0x4e, 0xdd, 0xe3, 0x23, 0x07, 0xfb, 0x89, 0xca, 0x2f, 0xad, 0xef, 0x3a, - 0x2c, 0x51, 0x6a, 0x37, 0x8b, 0x70, 0x31, 0x09, 0x4f, 0xd9, 0x13, 0x98, 0x4f, 0xb3, 0x9f, 0xa6, - 0x6e, 0xea, 0x5b, 0x8d, 0x6d, 0xd3, 0x2e, 0x2a, 0xc7, 0xbe, 0x0a, 0x70, 0x07, 0x74, 0xf6, 0x06, - 0x6e, 0xc7, 0x89, 0x12, 0x71, 0xc4, 0xc3, 0x4e, 0x1f, 0x15, 0xf7, 0xb8, 0xe2, 0xcd, 0x39, 0xb3, - 0xb4, 0xd5, 0xd8, 0x36, 0x8a, 0xdf, 0x38, 0xcc, 0x59, 0x3b, 0xe5, 0xb3, 0x9f, 0xf7, 0x35, 0x77, - 0x79, 0x18, 0x3e, 0xc4, 0xd9, 0x1a, 0x80, 0x44, 0x8c, 0x3a, 0xdd, 0x30, 0xee, 0x7d, 0x6c, 0x96, - 0x4c, 0x7d, 0xab, 0xe4, 0xd6, 0x33, 0x64, 0x27, 0x03, 0xac, 0x57, 0x00, 0x54, 0xfc, 0x3b, 0xc9, - 0x7d, 0x64, 0xab, 0x50, 0xa7, 0xd3, 0xbe, 0x50, 0x92, 0x6a, 0x2f, 0xbb, 0x57, 0x00, 0x33, 0xa1, - 0x41, 0x87, 0x43, 0x21, 0x25, 0xca, 0xe6, 0x1c, 0xdd, 0x8f, 0x43, 0x56, 0x30, 0x8c, 0xe7, 0x32, - 0x60, 0xcf, 0xa0, 0x9a, 0xe2, 0xa7, 0x63, 0x94, 0x2a, 0x97, 0x61, 0x63, 0x86, 0x0c, 0xaf, 0x53, - 0x71, 0xc2, 0x15, 0xee, 0x72, 0xc5, 0xdd, 0x61, 0x18, 0xbb, 0x0b, 0xb5, 0x5e, 0xc0, 0x45, 0xd4, - 0x11, 0x1e, 0x65, 0xab, 0xbb, 0x55, 0x3a, 0x1f, 0x78, 0xd6, 0x1f, 0x1d, 0x16, 0xdd, 0x91, 0xeb, - 0x7b, 0xa8, 0xd8, 0x03, 0x58, 0xc8, 0xe3, 0x3a, 0x01, 0x97, 0x01, 0xe5, 0x5c, 0x70, 0x1b, 0x39, - 0x46, 0x15, 0xad, 0x01, 0x90, 0x0c, 0x03, 0xc2, 0x1c, 0x11, 0xea, 0x84, 0xd0, 0xf5, 0x2a, 0xd4, - 0x8f, 0x44, 0xc4, 0x43, 0xf1, 0x05, 0x3d, 0x52, 0xaa, 0xe6, 0x5e, 0x01, 0x6c, 0x13, 0x96, 0xf2, - 0xb7, 0xd0, 0xcb, 0xd5, 0x2c, 0x93, 0x9a, 0xb7, 0x46, 0x30, 0x49, 0xca, 0x36, 0x60, 0x49, 0x06, - 0x3c, 0x45, 0xaf, 0x23, 0x15, 0x57, 0x98, 0x15, 0x3f, 0x4f, 0xc5, 0x2f, 0x0e, 0xe0, 0x76, 0x86, - 0x1e, 0x78, 0x13, 0xdd, 0x55, 0x26, 0xba, 0x9b, 0x32, 0xad, 0x3a, 0x6d, 0xda, 0xb7, 0xd2, 0x78, - 0xf3, 0xed, 0xff, 0xd2, 0xfc, 0x53, 0xa8, 0xa5, 0x28, 0x93, 0x38, 0x92, 0x48, 0xbd, 0xdf, 0x64, - 0x6a, 0x47, 0x11, 0x93, 0xd2, 0x95, 0xa7, 0xa5, 0x2b, 0x1c, 0xeb, 0xf9, 0x7f, 0x1a, 0xeb, 0x02, - 0x91, 0x2b, 0x45, 0x22, 0x17, 0xb8, 0x56, 0x2d, 0x74, 0x6d, 0xdc, 0x8d, 0xfa, 0x2c, 0x37, 0x60, - 0xca, 0x0d, 0xf6, 0x08, 0x18, 0x3f, 0xc1, 0x94, 0xfb, 0x38, 0x60, 0x74, 0x94, 0xe8, 0x63, 0xb3, - 0x41, 0xb4, 0xe5, 0xfc, 0x86, 0x98, 0x6f, 0x45, 0x1f, 0xb7, 0x7f, 0xeb, 0xb0, 0x40, 0x12, 0x62, - 0x4a, 0xee, 0xb1, 0xf7, 0x50, 0xdb, 0x43, 0x45, 0x10, 0x7b, 0x38, 0x43, 0xf2, 0xe1, 0xa0, 0xb7, - 0xd6, 0x8b, 0x49, 0x53, 0x3b, 0xc8, 0xd2, 0xd8, 0x01, 0xd4, 0xda, 0x37, 0x7e, 0xb9, 0x8d, 0xaa, - 0x75, 0xc7, 0x1e, 0x2c, 0x3a, 0x7b, 0xb8, 0xe8, 0xec, 0x97, 0xd9, 0xa2, 0xb3, 0x34, 0xb6, 0x0b, - 0x95, 0x7d, 0xe4, 0xa1, 0x0a, 0xd8, 0x35, 0x9c, 0x96, 0x39, 0xa3, 0x2a, 0x5a, 0x2e, 0x96, 0xb6, - 0xf3, 0xfc, 0xec, 0xc2, 0xd0, 0xcf, 0x2f, 0x0c, 0xfd, 0xd7, 0x85, 0xa1, 0x7f, 0xbd, 0x34, 0xb4, - 0xf3, 0x4b, 0x43, 0xfb, 0x71, 0x69, 0x68, 0x1f, 0x36, 0x7d, 0xa1, 0x82, 0xe3, 0xae, 0xdd, 0x8b, - 0xfb, 0xce, 0xc4, 0x3a, 0xfe, 0x3c, 0x5a, 0xc8, 0xea, 0x34, 0x41, 0xd9, 0xad, 0x50, 0xda, 0xc7, - 0x7f, 0x03, 0x00, 0x00, 0xff, 0xff, 0x81, 0x66, 0x88, 0xc1, 0x08, 0x06, 0x00, 0x00, + 0x10, 0xb6, 0x9b, 0x34, 0x3f, 0x9b, 0x94, 0x96, 0x55, 0x85, 0x42, 0x68, 0x8d, 0x31, 0xea, 0xcf, + 0x01, 0xd9, 0x52, 0x91, 0x38, 0x71, 0x80, 0xd2, 0xaa, 0xad, 0x44, 0x11, 0x38, 0x20, 0x21, 0x2e, + 0xd6, 0x26, 0x9e, 0xda, 0x2b, 0x1c, 0xaf, 0xf1, 0x6e, 0x2b, 0xca, 0x53, 0xf0, 0x3e, 0xbc, 0x40, + 0x8f, 0x3d, 0x72, 0x01, 0xa1, 0xf6, 0x29, 0xe0, 0x84, 0x3c, 0x71, 0xd2, 0x24, 0x72, 0xa3, 0x4a, + 0x70, 0xf2, 0xee, 0xb7, 0xdf, 0xec, 0xcc, 0x7c, 0xdf, 0x7a, 0xc8, 0x5a, 0xc4, 0x4e, 0x58, 0x0c, + 0xca, 0xc9, 0xbe, 0x4e, 0xc2, 0x78, 0xca, 0xe3, 0xc0, 0x49, 0x21, 0x62, 0xa7, 0x2f, 0x58, 0x2f, + 0x04, 0x3b, 0x49, 0x85, 0x12, 0x74, 0x39, 0xa7, 0xd9, 0xd9, 0xd7, 0xce, 0x69, 0xed, 0xe5, 0x40, + 0x04, 0x02, 0x09, 0x4e, 0xb6, 0x1a, 0x70, 0xdb, 0xe6, 0xf5, 0x57, 0xe6, 0x8c, 0x7b, 0x81, 0x10, + 0x41, 0x04, 0x0e, 0xee, 0xba, 0xc7, 0x47, 0x0e, 0xf4, 0x13, 0x95, 0x1f, 0x5a, 0xdf, 0x74, 0xb2, + 0x88, 0xa9, 0xdd, 0x2c, 0xc2, 0x85, 0x24, 0x3a, 0xa5, 0x4f, 0xc8, 0x7c, 0x9a, 0x2d, 0x5a, 0xba, + 0xa9, 0x6f, 0x36, 0xb6, 0x4c, 0xbb, 0xa8, 0x1c, 0xfb, 0x2a, 0xc0, 0x1d, 0xd0, 0xe9, 0x1b, 0x72, + 0x5b, 0x24, 0x8a, 0x8b, 0x98, 0x45, 0x5e, 0x1f, 0x14, 0xf3, 0x99, 0x62, 0xad, 0x39, 0xb3, 0xb4, + 0xd9, 0xd8, 0x32, 0x8a, 0xef, 0x38, 0xcc, 0x59, 0xdb, 0xe5, 0xb3, 0x9f, 0xf7, 0x35, 0x77, 0x69, + 0x18, 0x3e, 0xc4, 0xe9, 0x2a, 0x21, 0x12, 0x20, 0xf6, 0xba, 0x91, 0xe8, 0x7d, 0x6c, 0x95, 0x4c, + 0x7d, 0xb3, 0xe4, 0xd6, 0x33, 0x64, 0x3b, 0x03, 0xac, 0x97, 0x84, 0x60, 0xf1, 0xef, 0x24, 0x0b, + 0x80, 0xae, 0x90, 0x3a, 0xee, 0xf6, 0xb9, 0x92, 0x58, 0x7b, 0xd9, 0xbd, 0x02, 0xa8, 0x49, 0x1a, + 0xb8, 0x39, 0xe4, 0x52, 0x82, 0x6c, 0xcd, 0xe1, 0xf9, 0x38, 0x64, 0x85, 0xc3, 0x78, 0x26, 0x43, + 0xfa, 0x8c, 0x54, 0x53, 0xf8, 0x74, 0x0c, 0x52, 0xe5, 0x32, 0xac, 0xcf, 0x90, 0xe1, 0x75, 0xca, + 0x4f, 0x98, 0x82, 0x1d, 0xa6, 0x98, 0x3b, 0x0c, 0xa3, 0x77, 0x49, 0xad, 0x17, 0x32, 0x1e, 0x7b, + 0xdc, 0xc7, 0x6c, 0x75, 0xb7, 0x8a, 0xfb, 0x03, 0xdf, 0xfa, 0xa3, 0x93, 0x05, 0x77, 0xe4, 0xfa, + 0x1e, 0x28, 0xfa, 0x80, 0x34, 0xf3, 0x38, 0x2f, 0x64, 0x32, 0xc4, 0x9c, 0x4d, 0xb7, 0x91, 0x63, + 0x58, 0xd1, 0x2a, 0x21, 0x28, 0xc3, 0x80, 0x30, 0x87, 0x84, 0x3a, 0x22, 0x78, 0xbc, 0x42, 0xea, + 0x47, 0x3c, 0x66, 0x11, 0xff, 0x02, 0x3e, 0x2a, 0x55, 0x73, 0xaf, 0x00, 0xba, 0x41, 0x16, 0xf3, + 0xbb, 0xc0, 0xcf, 0xd5, 0x2c, 0xa3, 0x9a, 0xb7, 0x46, 0x30, 0x4a, 0x4a, 0xd7, 0xc9, 0xa2, 0x0c, + 0x59, 0x0a, 0xbe, 0x27, 0x15, 0x53, 0x90, 0x15, 0x3f, 0x8f, 0xc5, 0x2f, 0x0c, 0xe0, 0x4e, 0x86, + 0x1e, 0xf8, 0x13, 0xdd, 0x55, 0x26, 0xba, 0x9b, 0x32, 0xad, 0x3a, 0x6d, 0xda, 0x8f, 0xd2, 0x78, + 0xf3, 0x9d, 0xff, 0xd2, 0xfc, 0x53, 0x52, 0x4b, 0x41, 0x26, 0x22, 0x96, 0x80, 0xbd, 0xdf, 0xe4, + 0xd5, 0x8e, 0x22, 0x26, 0xa5, 0x2b, 0x4f, 0x4b, 0x57, 0xf8, 0xac, 0xe7, 0xff, 0xe9, 0x59, 0x17, + 0x88, 0x5c, 0x29, 0x12, 0xb9, 0xc0, 0xb5, 0x6a, 0xa1, 0x6b, 0xe3, 0x6e, 0xd4, 0x67, 0xb9, 0x41, + 0xa6, 0xdc, 0xa0, 0x8f, 0x08, 0x65, 0x27, 0x90, 0xb2, 0x00, 0x06, 0x0c, 0x4f, 0xf1, 0x3e, 0xb4, + 0x1a, 0x48, 0x5b, 0xca, 0x4f, 0x90, 0xf9, 0x96, 0xf7, 0x81, 0x5a, 0x64, 0x81, 0x4b, 0x2f, 0x16, + 0x3e, 0x78, 0x90, 0xa6, 0x22, 0x6d, 0x35, 0x51, 0xad, 0x06, 0x97, 0xaf, 0x84, 0x0f, 0xbb, 0x19, + 0xb4, 0xf5, 0x5b, 0x27, 0x4d, 0x94, 0x19, 0x52, 0x74, 0x98, 0xbe, 0x27, 0xb5, 0x3d, 0x50, 0x08, + 0xd1, 0x87, 0x33, 0x6c, 0x19, 0xfe, 0x0c, 0xed, 0xb5, 0x62, 0xd2, 0xd4, 0x9c, 0xb2, 0x34, 0x7a, + 0x40, 0x6a, 0x9d, 0x1b, 0xdf, 0xdc, 0x01, 0xd5, 0xbe, 0x63, 0x0f, 0x86, 0xa1, 0x3d, 0x1c, 0x86, + 0xf6, 0x6e, 0x36, 0x0c, 0x2d, 0x8d, 0xee, 0x90, 0xca, 0x3e, 0xb0, 0x48, 0x85, 0xf4, 0x1a, 0x4e, + 0xdb, 0x9c, 0x51, 0x15, 0x0e, 0x20, 0x4b, 0xdb, 0x7e, 0x7e, 0x76, 0x61, 0xe8, 0xe7, 0x17, 0x86, + 0xfe, 0xeb, 0xc2, 0xd0, 0xbf, 0x5e, 0x1a, 0xda, 0xf9, 0xa5, 0xa1, 0x7d, 0xbf, 0x34, 0xb4, 0x0f, + 0x1b, 0x01, 0x57, 0xe1, 0x71, 0xd7, 0xee, 0x89, 0xbe, 0x33, 0x31, 0xb2, 0x3f, 0x8f, 0x86, 0xb6, + 0x3a, 0x4d, 0x40, 0x76, 0x2b, 0x98, 0xf6, 0xf1, 0xdf, 0x00, 0x00, 0x00, 0xff, 0xff, 0xa3, 0x1c, + 0xe5, 0x7f, 0x2c, 0x06, 0x00, 0x00, } // Reference imports to suppress errors if they are not otherwise used. @@ -830,6 +840,16 @@ func (m *RelayCacheSet) MarshalToSizedBuffer(dAtA []byte) (int, error) { _ = i var l int _ = l + if m.IsNodeError { + i-- + if m.IsNodeError { + dAtA[i] = 1 + } else { + dAtA[i] = 0 + } + i-- + dAtA[i] = 0x60 + } if m.AverageBlockTime != 0 { i = encodeVarintRelayCache(dAtA, i, uint64(m.AverageBlockTime)) i-- @@ -1055,6 +1075,9 @@ func (m *RelayCacheSet) Size() (n int) { if m.AverageBlockTime != 0 { n += 1 + sovRelayCache(uint64(m.AverageBlockTime)) } + if m.IsNodeError { + n += 2 + } return n } @@ -1957,6 +1980,26 @@ func (m *RelayCacheSet) Unmarshal(dAtA []byte) error { break } } + case 12: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field IsNodeError", wireType) + } + var v int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowRelayCache + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + v |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + m.IsNodeError = bool(v != 0) default: iNdEx = preIndex skippy, err := skipRelayCache(dAtA[iNdEx:])