Skip to content

Commit

Permalink
update backoff mechanism + use it in additional places
Browse files Browse the repository at this point in the history
  • Loading branch information
najeal committed Dec 20, 2024
1 parent 430fcaf commit 43fc8b2
Show file tree
Hide file tree
Showing 8 changed files with 97 additions and 110 deletions.
12 changes: 6 additions & 6 deletions messages/teleporter/message_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -364,12 +364,12 @@ func (m *messageHandler) waitForReceipt(
destinationBlockchainID := destinationClient.DestinationBlockchainID()
callCtx, callCtxCancel := context.WithTimeout(context.Background(), 30*time.Second)
defer callCtxCancel()
receipt, err := utils.CallWithRetry[*types.Receipt](
callCtx,
func() (*types.Receipt, error) {
return destinationClient.Client().(ethclient.Client).TransactionReceipt(callCtx, txHash)
},
)
var receipt *types.Receipt
operation := func() (err error) {
receipt, err = destinationClient.Client().(ethclient.Client).TransactionReceipt(callCtx, txHash)
return err
}
err := utils.WithMaxRetries(operation, 30*time.Second, m.logger)
if err != nil {
m.logger.Error(
"Failed to get transaction receipt",
Expand Down
38 changes: 18 additions & 20 deletions relayer/application_relayer.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,7 @@ import (
)

const (
// Number of retries to collect signatures from validators
maxRelayerQueryAttempts = 5
retryMaxElapsedTime = 10 * time.Second
)

// Errors
Expand Down Expand Up @@ -271,25 +270,24 @@ func (r *ApplicationRelayer) createSignedMessage(
signedWarpMessageBytes hexutil.Bytes
err error
)
err = utils.WithMaxRetriesLog(
func() error {
return r.sourceWarpSignatureClient.CallContext(
context.Background(),
&signedWarpMessageBytes,
"warp_getMessageAggregateSignature",
unsignedMessage.ID(),
r.warpConfig.QuorumNumerator,
r.signingSubnetID.String(),
)
},
maxRelayerQueryAttempts,
r.logger,
"Failed to get aggregate signature from node endpoint.",
zap.String("sourceBlockchainID", r.sourceBlockchain.GetBlockchainID().String()),
zap.String("destinationBlockchainID", r.relayerID.DestinationBlockchainID.String()),
zap.String("signingSubnetID", r.signingSubnetID.String()),
)
operation := func() error {
return r.sourceWarpSignatureClient.CallContext(
context.Background(),
&signedWarpMessageBytes,
"warp_getMessageAggregateSignature",
unsignedMessage.ID(),
r.warpConfig.QuorumNumerator,
r.signingSubnetID.String(),
)
}
err = utils.WithMaxRetries(operation, retryMaxElapsedTime, r.logger)
if err != nil {
r.logger.Error(
"Failed to get aggregate signature from node endpoint.",
zap.String("sourceBlockchainID", r.sourceBlockchain.GetBlockchainID().String()),
zap.String("destinationBlockchainID", r.relayerID.DestinationBlockchainID.String()),
zap.String("signingSubnetID", r.signingSubnetID.String()),
)
return nil, errFailedToGetAggSig
}
warpMsg, err := avalancheWarp.ParseMessage(signedWarpMessageBytes)
Expand Down
2 changes: 1 addition & 1 deletion relayer/message_coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,7 @@ func (mc *MessageCoordinator) ProcessBlock(
zap.Stringer("blockchainID", blockchainID),
)
// Parse the logs in the block, and group by application relayer
block, err := relayerTypes.NewWarpBlockInfo(blockHeader, ethClient)
block, err := relayerTypes.NewWarpBlockInfo(mc.logger, blockHeader, ethClient)
if err != nil {
mc.logger.Error("Failed to create Warp block info", zap.Error(err))
errChan <- err
Expand Down
43 changes: 23 additions & 20 deletions signature-aggregator/aggregator/aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"github.com/ava-labs/icm-services/signature-aggregator/metrics"
"github.com/ava-labs/icm-services/utils"
msg "github.com/ava-labs/subnet-evm/plugin/evm/message"
"github.com/cenkalti/backoff/v4"
"go.uber.org/zap"
"google.golang.org/protobuf/proto"
)
Expand All @@ -42,7 +43,7 @@ const (
maxRelayerQueryAttempts = 10
// Maximum amount of time to spend waiting (in addition to network round trip time per attempt)
// during relayer signature query routine
signatureRequestRetryWaitPeriodMs = 20_000
signatureRequestMaxElapsedTime = 20 * time.Second
)

var (
Expand Down Expand Up @@ -224,12 +225,12 @@ func (s *SignatureAggregator) CreateSignedMessage(
return nil, fmt.Errorf("%s: %w", msg, err)
}

var signedMsg *avalancheWarp.Message
// Query the validators with retries. On each retry, query one node per unique BLS pubkey
for attempt := 1; attempt <= maxRelayerQueryAttempts; attempt++ {
operation := func() error {
responsesExpected := len(connectedValidators.ValidatorSet) - len(signatureMap)
s.logger.Debug(
"Aggregator collecting signatures from peers.",
zap.Int("attempt", attempt),
zap.String("sourceBlockchainID", unsignedMessage.SourceChainID.String()),
zap.String("signingSubnetID", signingSubnet.String()),
zap.Int("validatorSetSize", len(connectedValidators.ValidatorSet)),
Expand Down Expand Up @@ -296,7 +297,8 @@ func (s *SignatureAggregator) CreateSignedMessage(
zap.String("warpMessageID", unsignedMessage.ID().String()),
zap.String("sourceBlockchainID", unsignedMessage.SourceChainID.String()),
)
signedMsg, relevant, err := s.handleResponse(
var relevant bool
signedMsg, relevant, err = s.handleResponse(
response,
sentTo,
requestID,
Expand All @@ -309,10 +311,10 @@ func (s *SignatureAggregator) CreateSignedMessage(
if err != nil {
// don't increase node failures metric here, because we did
// it in handleResponse
return nil, fmt.Errorf(
return backoff.Permanent(fmt.Errorf(
"failed to handle response: %w",
err,
)
))
}
if relevant {
responseCount++
Expand All @@ -325,28 +327,29 @@ func (s *SignatureAggregator) CreateSignedMessage(
zap.Uint64("signatureWeight", accumulatedSignatureWeight.Uint64()),
zap.String("sourceBlockchainID", unsignedMessage.SourceChainID.String()),
)
return signedMsg, nil
return nil
}
// Break once we've had successful or unsuccessful responses from each requested node
if responseCount == responsesExpected {
break
}
}
}
if attempt != maxRelayerQueryAttempts {
// Sleep such that all retries are uniformly spread across totalRelayerQueryPeriodMs
// TODO: We may want to consider an exponential back off rather than a uniform sleep period.
time.Sleep(time.Duration(signatureRequestRetryWaitPeriodMs/maxRelayerQueryAttempts) * time.Millisecond)
}
return errNotEnoughSignatures
}
s.logger.Warn(
"Failed to collect a threshold of signatures",
zap.Int("attempts", maxRelayerQueryAttempts),
zap.String("warpMessageID", unsignedMessage.ID().String()),
zap.Uint64("accumulatedWeight", accumulatedSignatureWeight.Uint64()),
zap.String("sourceBlockchainID", unsignedMessage.SourceChainID.String()),
)
return nil, errNotEnoughSignatures

err = utils.WithMaxRetries(operation, signatureRequestMaxElapsedTime, s.logger)
if err != nil {
s.logger.Warn(
"Failed to collect a threshold of signatures",
zap.Int("attempts", maxRelayerQueryAttempts),
zap.String("warpMessageID", unsignedMessage.ID().String()),
zap.Uint64("accumulatedWeight", accumulatedSignatureWeight.Uint64()),
zap.String("sourceBlockchainID", unsignedMessage.SourceChainID.String()),
)
return nil, errNotEnoughSignatures
}
return signedMsg, nil
}

func (s *SignatureAggregator) getSubnetID(blockchainID ids.ID) (ids.ID, error) {
Expand Down
22 changes: 12 additions & 10 deletions types/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,9 @@ package types
import (
"context"
"errors"
"time"

"github.com/ava-labs/avalanchego/utils/logging"
avalancheWarp "github.com/ava-labs/avalanchego/vms/platformvm/warp"
"github.com/ava-labs/icm-services/utils"
"github.com/ava-labs/subnet-evm/core/types"
Expand Down Expand Up @@ -38,7 +40,7 @@ type WarpMessageInfo struct {
}

// Extract Warp logs from the block, if they exist
func NewWarpBlockInfo(header *types.Header, ethClient ethclient.Client) (*WarpBlockInfo, error) {
func NewWarpBlockInfo(logger logging.Logger, header *types.Header, ethClient ethclient.Client) (*WarpBlockInfo, error) {
var (
logs []types.Log
err error
Expand All @@ -47,16 +49,16 @@ func NewWarpBlockInfo(header *types.Header, ethClient ethclient.Client) (*WarpBl
if header.Bloom.Test(WarpPrecompileLogFilter[:]) {
cctx, cancel := context.WithTimeout(context.Background(), utils.DefaultRPCRetryTimeout)
defer cancel()
logs, err = utils.CallWithRetry[[]types.Log](
cctx,
func() ([]types.Log, error) {
return ethClient.FilterLogs(context.Background(), interfaces.FilterQuery{
Topics: [][]common.Hash{{WarpPrecompileLogFilter}},
Addresses: []common.Address{warp.ContractAddress},
FromBlock: header.Number,
ToBlock: header.Number,
})
operation := func() (err error) {
logs, err = ethClient.FilterLogs(cctx, interfaces.FilterQuery{
Topics: [][]common.Hash{{WarpPrecompileLogFilter}},
Addresses: []common.Address{warp.ContractAddress},
FromBlock: header.Number,
ToBlock: header.Number,
})
return err
}
err = utils.WithMaxRetries(operation, 5*time.Second, logger)
if err != nil {
return nil, err
}
Expand Down
35 changes: 8 additions & 27 deletions utils/backoff.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,40 +5,21 @@ import (

"github.com/ava-labs/avalanchego/utils/logging"
"github.com/cenkalti/backoff/v4"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
)

// WithMaxRetriesLog runs the operation until it succeeds or max retries has been reached.
// It uses exponential back off.
// It optionally logs information if logger is set.
func WithMaxRetriesLog(
// WithMaxRetries uses an exponential backoff to run the operation until it
// succeeds or max elapsed time has been reached.
func WithMaxRetries(
operation backoff.Operation,
max uint64,
maxElapsedTime time.Duration,
logger logging.Logger,
msg string,
fields ...zapcore.Field,
) error {
attempt := uint(1)
expBackOff := backoff.WithMaxRetries(backoff.NewExponentialBackOff(), max)
expBackOff := backoff.NewExponentialBackOff(
backoff.WithMaxElapsedTime(maxElapsedTime),
)
notify := func(err error, duration time.Duration) {
if logger == nil {
return
}
fields := append(fields, zap.Uint("attempt", attempt), zap.Error(err), zap.Duration("backoff", duration))
logger.Warn(msg, fields...)
attempt++
logger.Warn("operation failed, retrying...")
}
err := backoff.RetryNotify(operation, expBackOff, notify)
if err != nil && logger != nil {
fields := append(fields, zap.Uint64("attempts", uint64(attempt)), zap.Error(err))
logger.Error(msg, fields...)
}
return err
}

// WithMaxRetries rens the operation until it succeeds or max retries has been reached.
// It uses exponential back off.
func WithMaxRetries(operation backoff.Operation, max uint64) error {
return WithMaxRetriesLog(operation, max, nil, "")
}
12 changes: 9 additions & 3 deletions utils/backoff_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@ package utils
import (
"errors"
"testing"
"time"

"github.com/ava-labs/avalanchego/utils/logging"
"github.com/stretchr/testify/require"
)

Expand All @@ -15,7 +17,9 @@ func TestWithMaxRetries(t *testing.T) {
_, err = retryable.Run()
return err
},
2,
// using default values: we want to run max 2 tries.
624*time.Millisecond,
logging.NoLog{},
)
require.Error(t, err)
})
Expand All @@ -27,7 +31,9 @@ func TestWithMaxRetries(t *testing.T) {
res, err = retryable.Run()
return err
},
2,
// using default values we want to run 3 tries.
2000*time.Millisecond,
logging.NoLog{},
)
require.NoError(t, err)
require.True(t, res)
Expand All @@ -47,7 +53,7 @@ func newMockRetryableFn(trigger uint64) mockRetryableFn {
}

func (m *mockRetryableFn) Run() (bool, error) {
if m.counter == m.trigger {
if m.counter >= m.trigger {
return true, nil
}
m.counter++
Expand Down
43 changes: 20 additions & 23 deletions vms/evm/subscriber.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,8 @@ import (
const (
// Max buffer size for ethereum subscription channels
maxClientSubscriptionBuffer = 20000
subscribeRetryTimeout = 1 * time.Second
MaxBlocksPerRequest = 200
rpcMaxTries = 5
retryMaxElapsedTime = 5 * time.Second
)

// subscriber implements Subscriber
Expand Down Expand Up @@ -129,18 +128,17 @@ func (s *subscriber) processBlockRange(
func (s *subscriber) getHeaderByNumberRetryable(headerNumber *big.Int) (*types.Header, error) {
var err error
var header *types.Header
err = utils.WithMaxRetriesLog(
func() (err error) {
header, err = s.rpcClient.HeaderByNumber(context.Background(), headerNumber)
return err
},
rpcMaxTries,
s.logger,
"Failed to get header by number",
zap.String("blockchainID", s.blockchainID.String()),
zap.Error(err),
)
operation := func() (err error) {
header, err = s.rpcClient.HeaderByNumber(context.Background(), headerNumber)
return err
}
err = utils.WithMaxRetries(operation, retryMaxElapsedTime, s.logger)
if err != nil {
s.logger.Error(
"Failed to get header by number",
zap.String("blockchainID", s.blockchainID.String()),
zap.Error(err),
)
return nil, err
}
return header, nil
Expand All @@ -164,17 +162,16 @@ func (s *subscriber) Subscribe(maxResubscribeAttempts int) error {
// subscribe until it succeeds or reached maxSubscribeAttempts.
func (s *subscriber) subscribe(maxSubscribeAttempts uint64) error {
var sub interfaces.Subscription
err := utils.WithMaxRetriesLog(
func() (err error) {
sub, err = s.wsClient.SubscribeNewHead(context.Background(), s.headers)
return err
},
maxSubscribeAttempts,
s.logger,
"Failed to subscribe to node",
zap.String("blockchainID", s.blockchainID.String()),
)
operation := func() (err error) {
sub, err = s.wsClient.SubscribeNewHead(context.Background(), s.headers)
return err
}
err := utils.WithMaxRetries(operation, retryMaxElapsedTime, s.logger)
if err != nil {
s.logger.Error(
"Failed to subscribe to node",
zap.String("blockchainID", s.blockchainID.String()),
)
return err
}
s.sub = sub
Expand Down

0 comments on commit 43fc8b2

Please sign in to comment.