diff --git a/protocol/common/cobra_common.go b/protocol/common/cobra_common.go index d7daa95e59..389e323f08 100644 --- a/protocol/common/cobra_common.go +++ b/protocol/common/cobra_common.go @@ -32,6 +32,8 @@ const ( // Disable relay retries when we get node errors. // This feature is suppose to help with successful relays in some chains that return node errors on rare race conditions on the serviced chains. DisableRetryOnNodeErrorsFlag = "disable-retry-on-node-error" + SetRelayCountOnNodeErrorFlag = "set-retry-count-on-node-error" + DisableCacheOnNodeErrorFlag = "disable-cache-on-node-error" UseOfflineSpecFlag = "use-offline-spec" // allows the user to manually load a spec providing a path, this is useful to test spec changes before they hit the blockchain ) @@ -56,7 +58,9 @@ type ConsumerCmdFlags struct { DebugRelays bool // enables debug mode for relays DisableConflictTransactions bool // disable conflict transactions DisableRetryOnNodeErrors bool // disable retries on node errors - OfflineSpecPath string // path to the spec file, works only when bootstrapping a single chain. + SetRelayCountOnNodeError int + DisableCacheOnNodeError bool + OfflineSpecPath string // path to the spec file, works only when bootstrapping a single chain. } // default rolling logs behavior (if enabled) will store 3 files each 100MB for up to 1 day every time. diff --git a/protocol/rpcconsumer/relay_processor.go b/protocol/rpcconsumer/relay_processor.go index f18b276663..799d84caba 100644 --- a/protocol/rpcconsumer/relay_processor.go +++ b/protocol/rpcconsumer/relay_processor.go @@ -59,8 +59,14 @@ type RelayProcessor struct { allowSessionDegradation uint32 // used in the scenario where extension was previously used. metricsInf MetricsInterface chainIdAndApiInterfaceGetter chainIdAndApiInterfaceGetter - disableRelayRetry bool relayRetriesManager *RelayRetriesManager + retryOptions relayProcessorRetryOptions +} + +type relayProcessorRetryOptions struct { + relayCountOnNodeError int + disableCacheOnNodeError bool + disableRelayRetry bool } func NewRelayProcessor( @@ -74,7 +80,7 @@ func NewRelayProcessor( debugRelay bool, metricsInf MetricsInterface, chainIdAndApiInterfaceGetter chainIdAndApiInterfaceGetter, - disableRelayRetry bool, + retryOptions relayProcessorRetryOptions, relayRetriesManager *RelayRetriesManager, ) *RelayProcessor { guid, _ := utils.GetUniqueIdentifier(ctx) @@ -100,7 +106,7 @@ func NewRelayProcessor( debugRelay: debugRelay, metricsInf: metricsInf, chainIdAndApiInterfaceGetter: chainIdAndApiInterfaceGetter, - disableRelayRetry: disableRelayRetry, + retryOptions: retryOptions, relayRetriesManager: relayRetriesManager, } } @@ -320,12 +326,13 @@ func (rp *RelayProcessor) shouldRetryRelay(resultsCount int, hashErr error, node // 2. If we have 0 successful relays and we have only node errors. // 3. Hash calculation was successful. // 4. Number of retries < NumberOfRetriesAllowedOnNodeErrors. - if !rp.disableRelayRetry && resultsCount == 0 && hashErr == nil { - if nodeErrors <= NumberOfRetriesAllowedOnNodeErrors { + if !rp.retryOptions.disableRelayRetry && resultsCount == 0 && hashErr == nil { + if nodeErrors <= rp.retryOptions.relayCountOnNodeError { // TODO: check chain message retry on archive. (this feature will be added in the generic parsers feature) + // Check if user specified to disable caching - OR // Check hash already exist, if it does, we don't want to retry - if !rp.relayRetriesManager.CheckHashInCache(hash) { + if rp.retryOptions.disableCacheOnNodeError || !rp.relayRetriesManager.CheckHashInCache(hash) { // If we didn't find the hash in the hash map we can retry utils.LavaFormatTrace("retrying on relay error", utils.LogAttr("retry_number", nodeErrors), utils.LogAttr("hash", hash)) go rp.metricsInf.SetNodeErrorAttemptMetric(rp.chainIdAndApiInterfaceGetter.GetChainIdAndApiInterface()) diff --git a/protocol/rpcconsumer/relay_processor_test.go b/protocol/rpcconsumer/relay_processor_test.go index bb21a8eda3..44822f1dbe 100644 --- a/protocol/rpcconsumer/relay_processor_test.go +++ b/protocol/rpcconsumer/relay_processor_test.go @@ -16,6 +16,12 @@ import ( "github.com/stretchr/testify/require" ) +var retryOptionsTest = relayProcessorRetryOptions{ + disableRelayRetry: false, + relayCountOnNodeError: 2, + disableCacheOnNodeError: false, +} + type relayProcessorMetricsMock struct{} func (romm *relayProcessorMetricsMock) SetRelayNodeErrorMetric(chainId string, apiInterface string) {} @@ -104,7 +110,7 @@ func TestRelayProcessorHappyFlow(t *testing.T) { require.NoError(t, err) chainMsg, err := chainParser.ParseMsg("/cosmos/base/tendermint/v1beta1/blocks/17", nil, http.MethodGet, nil, extensionslib.ExtensionInfo{LatestBlock: 0}) require.NoError(t, err) - relayProcessor := NewRelayProcessor(ctx, lavasession.NewUsedProviders(nil), 1, chainMsg, nil, "", "", false, relayProcessorMetrics, relayProcessorMetrics, false, relayRetriesManagerInstance) + relayProcessor := NewRelayProcessor(ctx, lavasession.NewUsedProviders(nil), 1, chainMsg, nil, "", "", false, relayProcessorMetrics, relayProcessorMetrics, retryOptionsTest, relayRetriesManagerInstance) usedProviders := relayProcessor.GetUsedProviders() ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*10) @@ -146,7 +152,7 @@ func TestRelayProcessorNodeErrorRetryFlow(t *testing.T) { require.NoError(t, err) chainMsg, err := chainParser.ParseMsg("/cosmos/base/tendermint/v1beta1/blocks/17", nil, http.MethodGet, nil, extensionslib.ExtensionInfo{LatestBlock: 0}) require.NoError(t, err) - relayProcessor := NewRelayProcessor(ctx, lavasession.NewUsedProviders(nil), 1, chainMsg, nil, "", "", false, relayProcessorMetrics, relayProcessorMetrics, false, relayRetriesManagerInstance) + relayProcessor := NewRelayProcessor(ctx, lavasession.NewUsedProviders(nil), 1, chainMsg, nil, "", "", false, relayProcessorMetrics, relayProcessorMetrics, retryOptionsTest, relayRetriesManagerInstance) usedProviders := relayProcessor.GetUsedProviders() ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*10) @@ -188,7 +194,7 @@ func TestRelayProcessorNodeErrorRetryFlow(t *testing.T) { // check hash map flow: chainMsg, err = chainParser.ParseMsg("/cosmos/base/tendermint/v1beta1/blocks/17", nil, http.MethodGet, nil, extensionslib.ExtensionInfo{LatestBlock: 0}) require.NoError(t, err) - relayProcessor = NewRelayProcessor(ctx, lavasession.NewUsedProviders(nil), 1, chainMsg, nil, "", "", false, relayProcessorMetrics, relayProcessorMetrics, false, relayRetriesManagerInstance) + relayProcessor = NewRelayProcessor(ctx, lavasession.NewUsedProviders(nil), 1, chainMsg, nil, "", "", false, relayProcessorMetrics, relayProcessorMetrics, retryOptionsTest, relayRetriesManagerInstance) usedProviders = relayProcessor.GetUsedProviders() ctx, cancel = context.WithTimeout(context.Background(), time.Millisecond*10) defer cancel() @@ -212,7 +218,7 @@ func TestRelayProcessorNodeErrorRetryFlow(t *testing.T) { // check hash map flow: chainMsg, err = chainParser.ParseMsg("/cosmos/base/tendermint/v1beta1/blocks/18", nil, http.MethodGet, nil, extensionslib.ExtensionInfo{LatestBlock: 0}) require.NoError(t, err) - relayProcessor = NewRelayProcessor(ctx, lavasession.NewUsedProviders(nil), 1, chainMsg, nil, "", "", false, relayProcessorMetrics, relayProcessorMetrics, false, relayRetriesManagerInstance) + relayProcessor = NewRelayProcessor(ctx, lavasession.NewUsedProviders(nil), 1, chainMsg, nil, "", "", false, relayProcessorMetrics, relayProcessorMetrics, retryOptionsTest, relayRetriesManagerInstance) usedProviders = relayProcessor.GetUsedProviders() ctx, cancel = context.WithTimeout(context.Background(), time.Millisecond*10) defer cancel() @@ -236,7 +242,7 @@ func TestRelayProcessorNodeErrorRetryFlow(t *testing.T) { // 4th relay, same inputs, this time a successful relay, should remove the hash from the map chainMsg, err = chainParser.ParseMsg("/cosmos/base/tendermint/v1beta1/blocks/17", nil, http.MethodGet, nil, extensionslib.ExtensionInfo{LatestBlock: 0}) require.NoError(t, err) - relayProcessor = NewRelayProcessor(ctx, lavasession.NewUsedProviders(nil), 1, chainMsg, nil, "", "", false, relayProcessorMetrics, relayProcessorMetrics, false, relayRetriesManagerInstance) + relayProcessor = NewRelayProcessor(ctx, lavasession.NewUsedProviders(nil), 1, chainMsg, nil, "", "", false, relayProcessorMetrics, relayProcessorMetrics, retryOptionsTest, relayRetriesManagerInstance) usedProviders = relayProcessor.GetUsedProviders() ctx, cancel = context.WithTimeout(context.Background(), time.Millisecond*10) defer cancel() @@ -285,8 +291,7 @@ func TestRelayProcessorNodeErrorRetryFlow(t *testing.T) { require.NoError(t, err) chainMsg, err := chainParser.ParseMsg("/cosmos/base/tendermint/v1beta1/blocks/17", nil, http.MethodGet, nil, extensionslib.ExtensionInfo{LatestBlock: 0}) require.NoError(t, err) - relayProcessor := NewRelayProcessor(ctx, lavasession.NewUsedProviders(nil), 1, chainMsg, nil, "", "", false, relayProcessorMetrics, relayProcessorMetrics, false, relayRetriesManagerInstance) - relayProcessor.disableRelayRetry = true + relayProcessor := NewRelayProcessor(ctx, lavasession.NewUsedProviders(nil), 1, chainMsg, nil, "", "", false, relayProcessorMetrics, relayProcessorMetrics, retryOptionsTest, relayRetriesManagerInstance) usedProviders := relayProcessor.GetUsedProviders() ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*10) defer cancel() @@ -323,7 +328,7 @@ func TestRelayProcessorTimeout(t *testing.T) { require.NoError(t, err) chainMsg, err := chainParser.ParseMsg("/cosmos/base/tendermint/v1beta1/blocks/17", nil, http.MethodGet, nil, extensionslib.ExtensionInfo{LatestBlock: 0}) require.NoError(t, err) - relayProcessor := NewRelayProcessor(ctx, lavasession.NewUsedProviders(nil), 1, chainMsg, nil, "", "", false, relayProcessorMetrics, relayProcessorMetrics, false, relayRetriesManagerInstance) + relayProcessor := NewRelayProcessor(ctx, lavasession.NewUsedProviders(nil), 1, chainMsg, nil, "", "", false, relayProcessorMetrics, relayProcessorMetrics, retryOptionsTest, relayRetriesManagerInstance) usedProviders := relayProcessor.GetUsedProviders() ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*10) @@ -375,7 +380,7 @@ func TestRelayProcessorRetry(t *testing.T) { require.NoError(t, err) chainMsg, err := chainParser.ParseMsg("/cosmos/base/tendermint/v1beta1/blocks/17", nil, http.MethodGet, nil, extensionslib.ExtensionInfo{LatestBlock: 0}) require.NoError(t, err) - relayProcessor := NewRelayProcessor(ctx, lavasession.NewUsedProviders(nil), 1, chainMsg, nil, "", "", false, relayProcessorMetrics, relayProcessorMetrics, false, relayRetriesManagerInstance) + relayProcessor := NewRelayProcessor(ctx, lavasession.NewUsedProviders(nil), 1, chainMsg, nil, "", "", false, relayProcessorMetrics, relayProcessorMetrics, retryOptionsTest, relayRetriesManagerInstance) usedProviders := relayProcessor.GetUsedProviders() ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*10) @@ -419,7 +424,7 @@ func TestRelayProcessorRetryNodeError(t *testing.T) { require.NoError(t, err) chainMsg, err := chainParser.ParseMsg("/cosmos/base/tendermint/v1beta1/blocks/17", nil, http.MethodGet, nil, extensionslib.ExtensionInfo{LatestBlock: 0}) require.NoError(t, err) - relayProcessor := NewRelayProcessor(ctx, lavasession.NewUsedProviders(nil), 1, chainMsg, nil, "", "", false, relayProcessorMetrics, relayProcessorMetrics, false, relayRetriesManagerInstance) + relayProcessor := NewRelayProcessor(ctx, lavasession.NewUsedProviders(nil), 1, chainMsg, nil, "", "", false, relayProcessorMetrics, relayProcessorMetrics, retryOptionsTest, relayRetriesManagerInstance) usedProviders := relayProcessor.GetUsedProviders() ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*10) @@ -464,7 +469,7 @@ func TestRelayProcessorStatefulApi(t *testing.T) { require.NoError(t, err) chainMsg, err := chainParser.ParseMsg("/cosmos/tx/v1beta1/txs", []byte("data"), http.MethodPost, nil, extensionslib.ExtensionInfo{LatestBlock: 0}) require.NoError(t, err) - relayProcessor := NewRelayProcessor(ctx, lavasession.NewUsedProviders(nil), 1, chainMsg, nil, "", "", false, relayProcessorMetrics, relayProcessorMetrics, false, relayRetriesManagerInstance) + relayProcessor := NewRelayProcessor(ctx, lavasession.NewUsedProviders(nil), 1, chainMsg, nil, "", "", false, relayProcessorMetrics, relayProcessorMetrics, retryOptionsTest, relayRetriesManagerInstance) usedProviders := relayProcessor.GetUsedProviders() ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*10) defer cancel() @@ -509,7 +514,7 @@ func TestRelayProcessorStatefulApiErr(t *testing.T) { require.NoError(t, err) chainMsg, err := chainParser.ParseMsg("/cosmos/tx/v1beta1/txs", []byte("data"), http.MethodPost, nil, extensionslib.ExtensionInfo{LatestBlock: 0}) require.NoError(t, err) - relayProcessor := NewRelayProcessor(ctx, lavasession.NewUsedProviders(nil), 1, chainMsg, nil, "", "", false, relayProcessorMetrics, relayProcessorMetrics, false, relayRetriesManagerInstance) + relayProcessor := NewRelayProcessor(ctx, lavasession.NewUsedProviders(nil), 1, chainMsg, nil, "", "", false, relayProcessorMetrics, relayProcessorMetrics, retryOptionsTest, relayRetriesManagerInstance) usedProviders := relayProcessor.GetUsedProviders() ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*10) defer cancel() @@ -553,7 +558,7 @@ func TestRelayProcessorLatest(t *testing.T) { require.NoError(t, err) chainMsg, err := chainParser.ParseMsg("/cosmos/base/tendermint/v1beta1/blocks/latest", nil, http.MethodGet, nil, extensionslib.ExtensionInfo{LatestBlock: 0}) require.NoError(t, err) - relayProcessor := NewRelayProcessor(ctx, lavasession.NewUsedProviders(nil), 1, chainMsg, nil, "", "", false, relayProcessorMetrics, relayProcessorMetrics, false, relayRetriesManagerInstance) + relayProcessor := NewRelayProcessor(ctx, lavasession.NewUsedProviders(nil), 1, chainMsg, nil, "", "", false, relayProcessorMetrics, relayProcessorMetrics, retryOptionsTest, relayRetriesManagerInstance) usedProviders := relayProcessor.GetUsedProviders() ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*10) defer cancel() diff --git a/protocol/rpcconsumer/rpcconsumer.go b/protocol/rpcconsumer/rpcconsumer.go index c5b5a32e5b..94fc853b6b 100644 --- a/protocol/rpcconsumer/rpcconsumer.go +++ b/protocol/rpcconsumer/rpcconsumer.go @@ -584,6 +584,8 @@ rpcconsumer consumer_examples/full_consumer_example.yml --cache-be "127.0.0.1:77 DebugRelays: viper.GetBool(DebugRelaysFlagName), DisableConflictTransactions: viper.GetBool(common.DisableConflictTransactionsFlag), DisableRetryOnNodeErrors: viper.GetBool(common.DisableRetryOnNodeErrorsFlag), + SetRelayCountOnNodeError: viper.GetInt(common.SetRelayCountOnNodeErrorFlag), + DisableCacheOnNodeError: viper.GetBool(common.DisableCacheOnNodeErrorFlag), OfflineSpecPath: viper.GetString(common.UseOfflineSpecFlag), } @@ -634,6 +636,9 @@ rpcconsumer consumer_examples/full_consumer_example.yml --cache-be "127.0.0.1:77 cmdRPCConsumer.Flags().Bool(common.DisableRetryOnNodeErrorsFlag, false, "Disable relay retries on node errors, prevent the rpcconsumer trying a different provider") cmdRPCConsumer.Flags().String(common.UseOfflineSpecFlag, "", "load offline spec provided path to spec file, used to test specs before they are proposed on chain") + cmdRPCConsumer.Flags().Int(common.SetRelayCountOnNodeErrorFlag, 2, "set the number of retries attempt on node errors") + cmdRPCConsumer.Flags().Bool(common.DisableCacheOnNodeErrorFlag, false, "cancel the use of cache to block retry attempts that failed in the past, this will cause every node error to send multiple relays for ever including spam") + common.AddRollingLogConfig(cmdRPCConsumer) return cmdRPCConsumer } diff --git a/protocol/rpcconsumer/rpcconsumer_server.go b/protocol/rpcconsumer/rpcconsumer_server.go index 3ea5a21a36..c374acff31 100644 --- a/protocol/rpcconsumer/rpcconsumer_server.go +++ b/protocol/rpcconsumer/rpcconsumer_server.go @@ -74,7 +74,7 @@ type RPCConsumerServer struct { connectedSubscriptionsContexts map[string]*CancelableContextHolder chainListener chainlib.ChainListener connectedSubscriptionsLock sync.RWMutex - disableNodeErrorRetry bool + retryOptions relayProcessorRetryOptions relayRetriesManager *RelayRetriesManager } @@ -125,7 +125,11 @@ func (rpccs *RPCConsumerServer) ServeRPCRequests(ctx context.Context, listenEndp rpccs.debugRelays = cmdFlags.DebugRelays rpccs.connectedSubscriptionsContexts = make(map[string]*CancelableContextHolder) rpccs.consumerProcessGuid = strconv.FormatUint(utils.GenerateUniqueIdentifier(), 10) - rpccs.disableNodeErrorRetry = cmdFlags.DisableRetryOnNodeErrors + rpccs.retryOptions = relayProcessorRetryOptions{ + disableRelayRetry: cmdFlags.DisableRetryOnNodeErrors, + relayCountOnNodeError: cmdFlags.SetRelayCountOnNodeError, + disableCacheOnNodeError: cmdFlags.DisableCacheOnNodeError, + } rpccs.relayRetriesManager = NewRelayRetriesManager() rpccs.chainListener, err = chainlib.NewChainListener(ctx, listenEndpoint, rpccs, rpccs, rpcConsumerLogs, chainParser, refererData, consumerWsSubscriptionManager) if err != nil { @@ -227,7 +231,7 @@ func (rpccs *RPCConsumerServer) craftRelay(ctx context.Context) (ok bool, relay func (rpccs *RPCConsumerServer) sendRelayWithRetries(ctx context.Context, retries int, initialRelays bool, protocolMessage chainlib.ProtocolMessage) (bool, error) { success := false var err error - relayProcessor := NewRelayProcessor(ctx, lavasession.NewUsedProviders(nil), 1, protocolMessage, rpccs.consumerConsistency, "-init-", "", rpccs.debugRelays, rpccs.rpcConsumerLogs, rpccs, rpccs.disableNodeErrorRetry, rpccs.relayRetriesManager) + relayProcessor := NewRelayProcessor(ctx, lavasession.NewUsedProviders(nil), 1, protocolMessage, rpccs.consumerConsistency, "-init-", "", rpccs.debugRelays, rpccs.rpcConsumerLogs, rpccs, rpccs.retryOptions, rpccs.relayRetriesManager) usedProvidersResets := 1 for i := 0; i < retries; i++ { // Check if we even have enough providers to communicate with them all. @@ -419,7 +423,7 @@ func (rpccs *RPCConsumerServer) ProcessRelaySend(ctx context.Context, protocolMe // make sure all of the child contexts are cancelled when we exit ctx, cancel := context.WithCancel(ctx) defer cancel() - relayProcessor := NewRelayProcessor(ctx, lavasession.NewUsedProviders(protocolMessage), rpccs.requiredResponses, protocolMessage, rpccs.consumerConsistency, dappID, consumerIp, rpccs.debugRelays, rpccs.rpcConsumerLogs, rpccs, rpccs.disableNodeErrorRetry, rpccs.relayRetriesManager) + relayProcessor := NewRelayProcessor(ctx, lavasession.NewUsedProviders(protocolMessage), rpccs.requiredResponses, protocolMessage, rpccs.consumerConsistency, dappID, consumerIp, rpccs.debugRelays, rpccs.rpcConsumerLogs, rpccs, rpccs.retryOptions, rpccs.relayRetriesManager) var err error // try sending a relay 3 times. if failed return the error for retryFirstRelayAttempt := 0; retryFirstRelayAttempt < SendRelayAttempts; retryFirstRelayAttempt++ { @@ -447,16 +451,16 @@ func (rpccs *RPCConsumerServer) ProcessRelaySend(ctx context.Context, protocolMe utils.LavaFormatDebug("Relay initiated with the following timeout schedule", utils.LogAttr("processingTimeout", processingTimeout), utils.LogAttr("newRelayTimeout", relayTimeout)) } - apiName := chainMessage.GetApi().Name + apiName := protocolMessage.GetApi().Name resetUsedOnce := true setArchiveOnSpecialApi := func() { if apiName == "tx" || apiName == "chunk" || apiName == "EXPERIMENTAL_tx_status" { archiveExtensionArray := []string{"archive"} - chainMessage.OverrideExtensions(archiveExtensionArray, rpccs.chainParser.ExtensionsParser()) - relayRequestData.Extensions = archiveExtensionArray + protocolMessage.OverrideExtensions(archiveExtensionArray, rpccs.chainParser.ExtensionsParser()) + protocolMessage.RelayPrivateData().Extensions = archiveExtensionArray if resetUsedOnce { resetUsedOnce = false - relayProcessor.usedProviders = lavasession.NewUsedProviders(directiveHeaders) + relayProcessor.usedProviders = lavasession.NewUsedProviders(protocolMessage) } } } @@ -1259,7 +1263,7 @@ func (rpccs *RPCConsumerServer) sendDataReliabilityRelayIfApplicable(ctx context if len(results) < 2 { relayRequestData := lavaprotocol.NewRelayData(ctx, relayResult.Request.RelayData.ConnectionType, relayResult.Request.RelayData.ApiUrl, relayResult.Request.RelayData.Data, relayResult.Request.RelayData.SeenBlock, reqBlock, relayResult.Request.RelayData.ApiInterface, chainMessage.GetRPCMessage().GetHeaders(), relayResult.Request.RelayData.Addon, relayResult.Request.RelayData.Extensions) protocolMessage := chainlib.NewProtocolMessage(chainMessage, nil, relayRequestData) - relayProcessorDataReliability := NewRelayProcessor(ctx, relayProcessor.usedProviders, 1, chainMessage, rpccs.consumerConsistency, dappID, consumerIp, rpccs.debugRelays, rpccs.rpcConsumerLogs, rpccs, rpccs.disableNodeErrorRetry, rpccs.relayRetriesManager) + relayProcessorDataReliability := NewRelayProcessor(ctx, relayProcessor.usedProviders, 1, chainMessage, rpccs.consumerConsistency, dappID, consumerIp, rpccs.debugRelays, rpccs.rpcConsumerLogs, rpccs, rpccs.retryOptions, rpccs.relayRetriesManager) err := rpccs.sendRelayToProvider(ctx, protocolMessage, dappID, consumerIp, relayProcessorDataReliability, nil) if err != nil { return utils.LavaFormatWarning("failed data reliability relay to provider", err, utils.LogAttr("relayProcessorDataReliability", relayProcessorDataReliability))