Skip to content

Commit

Permalink
feat: PRT - adding relay processor retry options
Browse files Browse the repository at this point in the history
  • Loading branch information
ranlavanet committed Aug 26, 2024
1 parent 395a054 commit bc2afb7
Show file tree
Hide file tree
Showing 5 changed files with 50 additions and 25 deletions.
6 changes: 5 additions & 1 deletion protocol/common/cobra_common.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ const (
// Disable relay retries when we get node errors.
// This feature is suppose to help with successful relays in some chains that return node errors on rare race conditions on the serviced chains.
DisableRetryOnNodeErrorsFlag = "disable-retry-on-node-error"
SetRelayCountOnNodeErrorFlag = "set-retry-count-on-node-error"
DisableCacheOnNodeErrorFlag = "disable-cache-on-node-error"
UseOfflineSpecFlag = "use-offline-spec" // allows the user to manually load a spec providing a path, this is useful to test spec changes before they hit the blockchain
)

Expand All @@ -56,7 +58,9 @@ type ConsumerCmdFlags struct {
DebugRelays bool // enables debug mode for relays
DisableConflictTransactions bool // disable conflict transactions
DisableRetryOnNodeErrors bool // disable retries on node errors
OfflineSpecPath string // path to the spec file, works only when bootstrapping a single chain.
SetRelayCountOnNodeError int
DisableCacheOnNodeError bool
OfflineSpecPath string // path to the spec file, works only when bootstrapping a single chain.
}

// default rolling logs behavior (if enabled) will store 3 files each 100MB for up to 1 day every time.
Expand Down
19 changes: 13 additions & 6 deletions protocol/rpcconsumer/relay_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,14 @@ type RelayProcessor struct {
allowSessionDegradation uint32 // used in the scenario where extension was previously used.
metricsInf MetricsInterface
chainIdAndApiInterfaceGetter chainIdAndApiInterfaceGetter
disableRelayRetry bool
relayRetriesManager *RelayRetriesManager
retryOptions retryProcessorOptions
}

type retryProcessorOptions struct {
relayCountOnNodeError int
disableCacheOnNodeError bool
disableRelayRetry bool
}

func NewRelayProcessor(
Expand All @@ -74,7 +80,7 @@ func NewRelayProcessor(
debugRelay bool,
metricsInf MetricsInterface,
chainIdAndApiInterfaceGetter chainIdAndApiInterfaceGetter,
disableRelayRetry bool,
retryOptions retryProcessorOptions,
relayRetriesManager *RelayRetriesManager,
) *RelayProcessor {
guid, _ := utils.GetUniqueIdentifier(ctx)
Expand All @@ -100,7 +106,7 @@ func NewRelayProcessor(
debugRelay: debugRelay,
metricsInf: metricsInf,
chainIdAndApiInterfaceGetter: chainIdAndApiInterfaceGetter,
disableRelayRetry: disableRelayRetry,
retryOptions: retryOptions,
relayRetriesManager: relayRetriesManager,
}
}
Expand Down Expand Up @@ -320,12 +326,13 @@ func (rp *RelayProcessor) shouldRetryRelay(resultsCount int, hashErr error, node
// 2. If we have 0 successful relays and we have only node errors.
// 3. Hash calculation was successful.
// 4. Number of retries < NumberOfRetriesAllowedOnNodeErrors.
if !rp.disableRelayRetry && resultsCount == 0 && hashErr == nil {
if nodeErrors <= NumberOfRetriesAllowedOnNodeErrors {
if !rp.retryOptions.disableRelayRetry && resultsCount == 0 && hashErr == nil {
if nodeErrors <= rp.retryOptions.relayCountOnNodeError {
// TODO: check chain message retry on archive. (this feature will be added in the generic parsers feature)

// Check if user specified to disable caching - OR
// Check hash already exist, if it does, we don't want to retry
if !rp.relayRetriesManager.CheckHashInCache(hash) {
if rp.retryOptions.disableCacheOnNodeError || !rp.relayRetriesManager.CheckHashInCache(hash) {
// If we didn't find the hash in the hash map we can retry
utils.LavaFormatTrace("retrying on relay error", utils.LogAttr("retry_number", nodeErrors), utils.LogAttr("hash", hash))
go rp.metricsInf.SetNodeErrorAttemptMetric(rp.chainIdAndApiInterfaceGetter.GetChainIdAndApiInterface())
Expand Down
31 changes: 18 additions & 13 deletions protocol/rpcconsumer/relay_processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,12 @@ import (
"github.com/stretchr/testify/require"
)

var retryOptionsTest = retryProcessorOptions{
disableRelayRetry: false,
relayCountOnNodeError: 2,
disableCacheOnNodeError: false,
}

type relayProcessorMetricsMock struct{}

func (romm *relayProcessorMetricsMock) SetRelayNodeErrorMetric(chainId string, apiInterface string) {}
Expand Down Expand Up @@ -104,7 +110,7 @@ func TestRelayProcessorHappyFlow(t *testing.T) {
require.NoError(t, err)
chainMsg, err := chainParser.ParseMsg("/cosmos/base/tendermint/v1beta1/blocks/17", nil, http.MethodGet, nil, extensionslib.ExtensionInfo{LatestBlock: 0})
require.NoError(t, err)
relayProcessor := NewRelayProcessor(ctx, lavasession.NewUsedProviders(nil), 1, chainMsg, nil, "", "", false, relayProcessorMetrics, relayProcessorMetrics, false, relayRetriesManagerInstance)
relayProcessor := NewRelayProcessor(ctx, lavasession.NewUsedProviders(nil), 1, chainMsg, nil, "", "", false, relayProcessorMetrics, relayProcessorMetrics, retryOptionsTest, relayRetriesManagerInstance)

usedProviders := relayProcessor.GetUsedProviders()
ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*10)
Expand Down Expand Up @@ -146,7 +152,7 @@ func TestRelayProcessorNodeErrorRetryFlow(t *testing.T) {
require.NoError(t, err)
chainMsg, err := chainParser.ParseMsg("/cosmos/base/tendermint/v1beta1/blocks/17", nil, http.MethodGet, nil, extensionslib.ExtensionInfo{LatestBlock: 0})
require.NoError(t, err)
relayProcessor := NewRelayProcessor(ctx, lavasession.NewUsedProviders(nil), 1, chainMsg, nil, "", "", false, relayProcessorMetrics, relayProcessorMetrics, false, relayRetriesManagerInstance)
relayProcessor := NewRelayProcessor(ctx, lavasession.NewUsedProviders(nil), 1, chainMsg, nil, "", "", false, relayProcessorMetrics, relayProcessorMetrics, retryOptionsTest, relayRetriesManagerInstance)

usedProviders := relayProcessor.GetUsedProviders()
ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*10)
Expand Down Expand Up @@ -188,7 +194,7 @@ func TestRelayProcessorNodeErrorRetryFlow(t *testing.T) {
// check hash map flow:
chainMsg, err = chainParser.ParseMsg("/cosmos/base/tendermint/v1beta1/blocks/17", nil, http.MethodGet, nil, extensionslib.ExtensionInfo{LatestBlock: 0})
require.NoError(t, err)
relayProcessor = NewRelayProcessor(ctx, lavasession.NewUsedProviders(nil), 1, chainMsg, nil, "", "", false, relayProcessorMetrics, relayProcessorMetrics, false, relayRetriesManagerInstance)
relayProcessor = NewRelayProcessor(ctx, lavasession.NewUsedProviders(nil), 1, chainMsg, nil, "", "", false, relayProcessorMetrics, relayProcessorMetrics, retryOptionsTest, relayRetriesManagerInstance)
usedProviders = relayProcessor.GetUsedProviders()
ctx, cancel = context.WithTimeout(context.Background(), time.Millisecond*10)
defer cancel()
Expand All @@ -212,7 +218,7 @@ func TestRelayProcessorNodeErrorRetryFlow(t *testing.T) {
// check hash map flow:
chainMsg, err = chainParser.ParseMsg("/cosmos/base/tendermint/v1beta1/blocks/18", nil, http.MethodGet, nil, extensionslib.ExtensionInfo{LatestBlock: 0})
require.NoError(t, err)
relayProcessor = NewRelayProcessor(ctx, lavasession.NewUsedProviders(nil), 1, chainMsg, nil, "", "", false, relayProcessorMetrics, relayProcessorMetrics, false, relayRetriesManagerInstance)
relayProcessor = NewRelayProcessor(ctx, lavasession.NewUsedProviders(nil), 1, chainMsg, nil, "", "", false, relayProcessorMetrics, relayProcessorMetrics, retryOptionsTest, relayRetriesManagerInstance)
usedProviders = relayProcessor.GetUsedProviders()
ctx, cancel = context.WithTimeout(context.Background(), time.Millisecond*10)
defer cancel()
Expand All @@ -236,7 +242,7 @@ func TestRelayProcessorNodeErrorRetryFlow(t *testing.T) {
// 4th relay, same inputs, this time a successful relay, should remove the hash from the map
chainMsg, err = chainParser.ParseMsg("/cosmos/base/tendermint/v1beta1/blocks/17", nil, http.MethodGet, nil, extensionslib.ExtensionInfo{LatestBlock: 0})
require.NoError(t, err)
relayProcessor = NewRelayProcessor(ctx, lavasession.NewUsedProviders(nil), 1, chainMsg, nil, "", "", false, relayProcessorMetrics, relayProcessorMetrics, false, relayRetriesManagerInstance)
relayProcessor = NewRelayProcessor(ctx, lavasession.NewUsedProviders(nil), 1, chainMsg, nil, "", "", false, relayProcessorMetrics, relayProcessorMetrics, retryOptionsTest, relayRetriesManagerInstance)
usedProviders = relayProcessor.GetUsedProviders()
ctx, cancel = context.WithTimeout(context.Background(), time.Millisecond*10)
defer cancel()
Expand Down Expand Up @@ -285,8 +291,7 @@ func TestRelayProcessorNodeErrorRetryFlow(t *testing.T) {
require.NoError(t, err)
chainMsg, err := chainParser.ParseMsg("/cosmos/base/tendermint/v1beta1/blocks/17", nil, http.MethodGet, nil, extensionslib.ExtensionInfo{LatestBlock: 0})
require.NoError(t, err)
relayProcessor := NewRelayProcessor(ctx, lavasession.NewUsedProviders(nil), 1, chainMsg, nil, "", "", false, relayProcessorMetrics, relayProcessorMetrics, false, relayRetriesManagerInstance)
relayProcessor.disableRelayRetry = true
relayProcessor := NewRelayProcessor(ctx, lavasession.NewUsedProviders(nil), 1, chainMsg, nil, "", "", false, relayProcessorMetrics, relayProcessorMetrics, retryOptionsTest, relayRetriesManagerInstance)
usedProviders := relayProcessor.GetUsedProviders()
ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*10)
defer cancel()
Expand Down Expand Up @@ -323,7 +328,7 @@ func TestRelayProcessorTimeout(t *testing.T) {
require.NoError(t, err)
chainMsg, err := chainParser.ParseMsg("/cosmos/base/tendermint/v1beta1/blocks/17", nil, http.MethodGet, nil, extensionslib.ExtensionInfo{LatestBlock: 0})
require.NoError(t, err)
relayProcessor := NewRelayProcessor(ctx, lavasession.NewUsedProviders(nil), 1, chainMsg, nil, "", "", false, relayProcessorMetrics, relayProcessorMetrics, false, relayRetriesManagerInstance)
relayProcessor := NewRelayProcessor(ctx, lavasession.NewUsedProviders(nil), 1, chainMsg, nil, "", "", false, relayProcessorMetrics, relayProcessorMetrics, retryOptionsTest, relayRetriesManagerInstance)

usedProviders := relayProcessor.GetUsedProviders()
ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*10)
Expand Down Expand Up @@ -375,7 +380,7 @@ func TestRelayProcessorRetry(t *testing.T) {
require.NoError(t, err)
chainMsg, err := chainParser.ParseMsg("/cosmos/base/tendermint/v1beta1/blocks/17", nil, http.MethodGet, nil, extensionslib.ExtensionInfo{LatestBlock: 0})
require.NoError(t, err)
relayProcessor := NewRelayProcessor(ctx, lavasession.NewUsedProviders(nil), 1, chainMsg, nil, "", "", false, relayProcessorMetrics, relayProcessorMetrics, false, relayRetriesManagerInstance)
relayProcessor := NewRelayProcessor(ctx, lavasession.NewUsedProviders(nil), 1, chainMsg, nil, "", "", false, relayProcessorMetrics, relayProcessorMetrics, retryOptionsTest, relayRetriesManagerInstance)

usedProviders := relayProcessor.GetUsedProviders()
ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*10)
Expand Down Expand Up @@ -419,7 +424,7 @@ func TestRelayProcessorRetryNodeError(t *testing.T) {
require.NoError(t, err)
chainMsg, err := chainParser.ParseMsg("/cosmos/base/tendermint/v1beta1/blocks/17", nil, http.MethodGet, nil, extensionslib.ExtensionInfo{LatestBlock: 0})
require.NoError(t, err)
relayProcessor := NewRelayProcessor(ctx, lavasession.NewUsedProviders(nil), 1, chainMsg, nil, "", "", false, relayProcessorMetrics, relayProcessorMetrics, false, relayRetriesManagerInstance)
relayProcessor := NewRelayProcessor(ctx, lavasession.NewUsedProviders(nil), 1, chainMsg, nil, "", "", false, relayProcessorMetrics, relayProcessorMetrics, retryOptionsTest, relayRetriesManagerInstance)

usedProviders := relayProcessor.GetUsedProviders()
ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*10)
Expand Down Expand Up @@ -464,7 +469,7 @@ func TestRelayProcessorStatefulApi(t *testing.T) {
require.NoError(t, err)
chainMsg, err := chainParser.ParseMsg("/cosmos/tx/v1beta1/txs", []byte("data"), http.MethodPost, nil, extensionslib.ExtensionInfo{LatestBlock: 0})
require.NoError(t, err)
relayProcessor := NewRelayProcessor(ctx, lavasession.NewUsedProviders(nil), 1, chainMsg, nil, "", "", false, relayProcessorMetrics, relayProcessorMetrics, false, relayRetriesManagerInstance)
relayProcessor := NewRelayProcessor(ctx, lavasession.NewUsedProviders(nil), 1, chainMsg, nil, "", "", false, relayProcessorMetrics, relayProcessorMetrics, retryOptionsTest, relayRetriesManagerInstance)
usedProviders := relayProcessor.GetUsedProviders()
ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*10)
defer cancel()
Expand Down Expand Up @@ -509,7 +514,7 @@ func TestRelayProcessorStatefulApiErr(t *testing.T) {
require.NoError(t, err)
chainMsg, err := chainParser.ParseMsg("/cosmos/tx/v1beta1/txs", []byte("data"), http.MethodPost, nil, extensionslib.ExtensionInfo{LatestBlock: 0})
require.NoError(t, err)
relayProcessor := NewRelayProcessor(ctx, lavasession.NewUsedProviders(nil), 1, chainMsg, nil, "", "", false, relayProcessorMetrics, relayProcessorMetrics, false, relayRetriesManagerInstance)
relayProcessor := NewRelayProcessor(ctx, lavasession.NewUsedProviders(nil), 1, chainMsg, nil, "", "", false, relayProcessorMetrics, relayProcessorMetrics, retryOptionsTest, relayRetriesManagerInstance)
usedProviders := relayProcessor.GetUsedProviders()
ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*10)
defer cancel()
Expand Down Expand Up @@ -553,7 +558,7 @@ func TestRelayProcessorLatest(t *testing.T) {
require.NoError(t, err)
chainMsg, err := chainParser.ParseMsg("/cosmos/base/tendermint/v1beta1/blocks/latest", nil, http.MethodGet, nil, extensionslib.ExtensionInfo{LatestBlock: 0})
require.NoError(t, err)
relayProcessor := NewRelayProcessor(ctx, lavasession.NewUsedProviders(nil), 1, chainMsg, nil, "", "", false, relayProcessorMetrics, relayProcessorMetrics, false, relayRetriesManagerInstance)
relayProcessor := NewRelayProcessor(ctx, lavasession.NewUsedProviders(nil), 1, chainMsg, nil, "", "", false, relayProcessorMetrics, relayProcessorMetrics, retryOptionsTest, relayRetriesManagerInstance)
usedProviders := relayProcessor.GetUsedProviders()
ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*10)
defer cancel()
Expand Down
5 changes: 5 additions & 0 deletions protocol/rpcconsumer/rpcconsumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -584,6 +584,8 @@ rpcconsumer consumer_examples/full_consumer_example.yml --cache-be "127.0.0.1:77
DebugRelays: viper.GetBool(DebugRelaysFlagName),
DisableConflictTransactions: viper.GetBool(common.DisableConflictTransactionsFlag),
DisableRetryOnNodeErrors: viper.GetBool(common.DisableRetryOnNodeErrorsFlag),
SetRelayCountOnNodeError: viper.GetInt(common.SetRelayCountOnNodeErrorFlag),
DisableCacheOnNodeError: viper.GetBool(common.DisableCacheOnNodeErrorFlag),
OfflineSpecPath: viper.GetString(common.UseOfflineSpecFlag),
}

Expand Down Expand Up @@ -634,6 +636,9 @@ rpcconsumer consumer_examples/full_consumer_example.yml --cache-be "127.0.0.1:77
cmdRPCConsumer.Flags().Bool(common.DisableRetryOnNodeErrorsFlag, false, "Disable relay retries on node errors, prevent the rpcconsumer trying a different provider")
cmdRPCConsumer.Flags().String(common.UseOfflineSpecFlag, "", "load offline spec provided path to spec file, used to test specs before they are proposed on chain")

cmdRPCConsumer.Flags().Int(common.SetRelayCountOnNodeErrorFlag, 2, "set the number of retries attempt on node errors")
cmdRPCConsumer.Flags().Bool(common.DisableCacheOnNodeErrorFlag, false, "cancel the use of cache to block retry attempts that failed in the past, this will cause every node error to send multiple relays for ever including spam")

common.AddRollingLogConfig(cmdRPCConsumer)
return cmdRPCConsumer
}
Expand Down
14 changes: 9 additions & 5 deletions protocol/rpcconsumer/rpcconsumer_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ type RPCConsumerServer struct {
connectedSubscriptionsContexts map[string]*CancelableContextHolder
chainListener chainlib.ChainListener
connectedSubscriptionsLock sync.RWMutex
disableNodeErrorRetry bool
retryOptions retryProcessorOptions
relayRetriesManager *RelayRetriesManager
}

Expand Down Expand Up @@ -124,7 +124,11 @@ func (rpccs *RPCConsumerServer) ServeRPCRequests(ctx context.Context, listenEndp
rpccs.debugRelays = cmdFlags.DebugRelays
rpccs.connectedSubscriptionsContexts = make(map[string]*CancelableContextHolder)
rpccs.consumerProcessGuid = strconv.FormatUint(utils.GenerateUniqueIdentifier(), 10)
rpccs.disableNodeErrorRetry = cmdFlags.DisableRetryOnNodeErrors
rpccs.retryOptions = retryProcessorOptions{
disableRelayRetry: cmdFlags.DisableRetryOnNodeErrors,
relayCountOnNodeError: cmdFlags.SetRelayCountOnNodeError,
disableCacheOnNodeError: cmdFlags.DisableCacheOnNodeError,
}
rpccs.relayRetriesManager = NewRelayRetriesManager()
rpccs.chainListener, err = chainlib.NewChainListener(ctx, listenEndpoint, rpccs, rpccs, rpcConsumerLogs, chainParser, refererData, consumerWsSubscriptionManager)
if err != nil {
Expand Down Expand Up @@ -226,7 +230,7 @@ func (rpccs *RPCConsumerServer) craftRelay(ctx context.Context) (ok bool, relay
func (rpccs *RPCConsumerServer) sendRelayWithRetries(ctx context.Context, retries int, initialRelays bool, protocolMessage chainlib.ProtocolMessage) (bool, error) {
success := false
var err error
relayProcessor := NewRelayProcessor(ctx, lavasession.NewUsedProviders(nil), 1, protocolMessage, rpccs.consumerConsistency, "-init-", "", rpccs.debugRelays, rpccs.rpcConsumerLogs, rpccs, rpccs.disableNodeErrorRetry, rpccs.relayRetriesManager)
relayProcessor := NewRelayProcessor(ctx, lavasession.NewUsedProviders(nil), 1, protocolMessage, rpccs.consumerConsistency, "-init-", "", rpccs.debugRelays, rpccs.rpcConsumerLogs, rpccs, rpccs.retryOptions, rpccs.relayRetriesManager)
for i := 0; i < retries; i++ {
err = rpccs.sendRelayToProvider(ctx, protocolMessage, "-init-", "", relayProcessor, nil)
if lavasession.PairingListEmptyError.Is(err) {
Expand Down Expand Up @@ -410,7 +414,7 @@ func (rpccs *RPCConsumerServer) ProcessRelaySend(ctx context.Context, protocolMe
// make sure all of the child contexts are cancelled when we exit
ctx, cancel := context.WithCancel(ctx)
defer cancel()
relayProcessor := NewRelayProcessor(ctx, lavasession.NewUsedProviders(protocolMessage), rpccs.requiredResponses, protocolMessage, rpccs.consumerConsistency, dappID, consumerIp, rpccs.debugRelays, rpccs.rpcConsumerLogs, rpccs, rpccs.disableNodeErrorRetry, rpccs.relayRetriesManager)
relayProcessor := NewRelayProcessor(ctx, lavasession.NewUsedProviders(protocolMessage), rpccs.requiredResponses, protocolMessage, rpccs.consumerConsistency, dappID, consumerIp, rpccs.debugRelays, rpccs.rpcConsumerLogs, rpccs, rpccs.retryOptions, rpccs.relayRetriesManager)
var err error
// try sending a relay 3 times. if failed return the error
for retryFirstRelayAttempt := 0; retryFirstRelayAttempt < SendRelayAttempts; retryFirstRelayAttempt++ {
Expand Down Expand Up @@ -1233,7 +1237,7 @@ func (rpccs *RPCConsumerServer) sendDataReliabilityRelayIfApplicable(ctx context
if len(results) < 2 {
relayRequestData := lavaprotocol.NewRelayData(ctx, relayResult.Request.RelayData.ConnectionType, relayResult.Request.RelayData.ApiUrl, relayResult.Request.RelayData.Data, relayResult.Request.RelayData.SeenBlock, reqBlock, relayResult.Request.RelayData.ApiInterface, chainMessage.GetRPCMessage().GetHeaders(), relayResult.Request.RelayData.Addon, relayResult.Request.RelayData.Extensions)
protocolMessage := chainlib.NewProtocolMessage(chainMessage, nil, relayRequestData)
relayProcessorDataReliability := NewRelayProcessor(ctx, relayProcessor.usedProviders, 1, chainMessage, rpccs.consumerConsistency, dappID, consumerIp, rpccs.debugRelays, rpccs.rpcConsumerLogs, rpccs, rpccs.disableNodeErrorRetry, rpccs.relayRetriesManager)
relayProcessorDataReliability := NewRelayProcessor(ctx, relayProcessor.usedProviders, 1, chainMessage, rpccs.consumerConsistency, dappID, consumerIp, rpccs.debugRelays, rpccs.rpcConsumerLogs, rpccs, rpccs.retryOptions, rpccs.relayRetriesManager)
err := rpccs.sendRelayToProvider(ctx, protocolMessage, dappID, consumerIp, relayProcessorDataReliability, nil)
if err != nil {
return utils.LavaFormatWarning("failed data reliability relay to provider", err, utils.LogAttr("relayProcessorDataReliability", relayProcessorDataReliability))
Expand Down

0 comments on commit bc2afb7

Please sign in to comment.