Skip to content

Commit

Permalink
fix: nits
Browse files Browse the repository at this point in the history
  • Loading branch information
najeal committed Dec 23, 2024
1 parent 58d18ec commit 89dc5b5
Show file tree
Hide file tree
Showing 10 changed files with 35 additions and 31 deletions.
13 changes: 8 additions & 5 deletions messages/teleporter/message_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,12 @@ import (
"google.golang.org/grpc"
)

// The maximum gas limit that can be specified for a Teleporter message
// Based on the C-Chain 15_000_000 gas limit per block, with other Warp message gas overhead conservatively estimated.
const maxTeleporterGasLimit = 12_000_000
const (
// The maximum gas limit that can be specified for a Teleporter message
// Based on the C-Chain 15_000_000 gas limit per block, with other Warp message gas overhead conservatively estimated.
maxTeleporterGasLimit = 12_000_000
defaultBlockAcceptanceTimeout = 30 * time.Second
)

type factory struct {
messageConfig Config
Expand Down Expand Up @@ -362,14 +365,14 @@ func (m *messageHandler) waitForReceipt(
teleporterMessageID ids.ID,
) error {
destinationBlockchainID := destinationClient.DestinationBlockchainID()
callCtx, callCtxCancel := context.WithTimeout(context.Background(), 30*time.Second)
callCtx, callCtxCancel := context.WithTimeout(context.Background(), defaultBlockAcceptanceTimeout)
defer callCtxCancel()
var receipt *types.Receipt
operation := func() (err error) {
receipt, err = destinationClient.Client().(ethclient.Client).TransactionReceipt(callCtx, txHash)
return err
}
err := utils.WithMaxRetries(operation, 30*time.Second, m.logger)
err := utils.WithRetriesTimeout(m.logger, operation, defaultBlockAcceptanceTimeout)
if err != nil {
m.logger.Error(
"Failed to get transaction receipt",
Expand Down
2 changes: 1 addition & 1 deletion relayer/application_relayer.go
Original file line number Diff line number Diff line change
Expand Up @@ -280,7 +280,7 @@ func (r *ApplicationRelayer) createSignedMessage(
r.signingSubnetID.String(),
)
}
err = utils.WithMaxRetries(operation, retryMaxElapsedTime, r.logger)
err = utils.WithRetriesTimeout(r.logger, operation, retryMaxElapsedTime)
if err != nil {
r.logger.Error(
"Failed to get aggregate signature from node endpoint.",
Expand Down
9 changes: 5 additions & 4 deletions relayer/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"fmt"
"math/big"
"math/rand"
"time"

"github.com/ava-labs/avalanchego/ids"
"github.com/ava-labs/avalanchego/utils/logging"
Expand All @@ -20,10 +21,10 @@ import (
)

const (
maxSubscribeAttempts = 10
retryMaxSubscribeElapsedTime = 10 * time.Second
// TODO attempt to resubscribe in perpetuity once we are able to process missed blocks and
// refresh the chain config on reconnect.
maxResubscribeAttempts = 10
retryMaxResubscribeElapsedTime = 10 * time.Second
)

// Listener handles all messages sent from a given source chain
Expand Down Expand Up @@ -137,7 +138,7 @@ func newListener(

// Open the subscription. We must do this before processing any missed messages, otherwise we may
// miss an incoming message in between fetching the latest block and subscribing.
err = lstnr.Subscriber.Subscribe(maxSubscribeAttempts)
err = lstnr.Subscriber.Subscribe(retryMaxSubscribeElapsedTime)
if err != nil {
logger.Error(
"Failed to subscribe to node",
Expand Down Expand Up @@ -228,7 +229,7 @@ func (lstnr *Listener) processLogs(ctx context.Context) error {
// Sets the listener health status to false while attempting to reconnect.
func (lstnr *Listener) reconnectToSubscriber() error {
// Attempt to reconnect the subscription
err := lstnr.Subscriber.Subscribe(maxResubscribeAttempts)
err := lstnr.Subscriber.Subscribe(retryMaxResubscribeElapsedTime)
if err != nil {
return fmt.Errorf("failed to resubscribe to node: %w", err)
}
Expand Down
2 changes: 1 addition & 1 deletion signature-aggregator/aggregator/aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -327,7 +327,7 @@ func (s *SignatureAggregator) CreateSignedMessage(
return errNotEnoughSignatures
}

err = utils.WithMaxRetries(operation, signatureRequestMaxElapsedTime, s.logger)
err = utils.WithRetriesTimeout(s.logger, operation, signatureRequestMaxElapsedTime)
if err != nil {
s.logger.Warn(
"Failed to collect a threshold of signatures",
Expand Down
5 changes: 2 additions & 3 deletions types/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ package types
import (
"context"
"errors"
"time"

"github.com/ava-labs/avalanchego/utils/logging"
avalancheWarp "github.com/ava-labs/avalanchego/vms/platformvm/warp"
Expand Down Expand Up @@ -47,7 +46,7 @@ func NewWarpBlockInfo(logger logging.Logger, header *types.Header, ethClient eth
)
// Check if the block contains warp logs, and fetch them from the client if it does
if header.Bloom.Test(WarpPrecompileLogFilter[:]) {
cctx, cancel := context.WithTimeout(context.Background(), utils.DefaultRPCRetryTimeout)
cctx, cancel := context.WithTimeout(context.Background(), utils.DefaultRPCTimeout)
defer cancel()
operation := func() (err error) {
logs, err = ethClient.FilterLogs(cctx, interfaces.FilterQuery{
Expand All @@ -58,7 +57,7 @@ func NewWarpBlockInfo(logger logging.Logger, header *types.Header, ethClient eth
})
return err
}
err = utils.WithMaxRetries(operation, 5*time.Second, logger)
err = utils.WithRetriesTimeout(logger, operation, utils.DefaultRPCTimeout)
if err != nil {
return nil, err
}
Expand Down
6 changes: 3 additions & 3 deletions utils/backoff.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,12 @@ import (
"github.com/cenkalti/backoff/v4"
)

// WithMaxRetries uses an exponential backoff to run the operation until it
// WithRetriesTimeout uses an exponential backoff to run the operation until it
// succeeds or max elapsed time has been reached.
func WithMaxRetries(
func WithRetriesTimeout(
logger logging.Logger,
operation backoff.Operation,
maxElapsedTime time.Duration,
logger logging.Logger,
) error {
expBackOff := backoff.NewExponentialBackOff(
backoff.WithMaxElapsedTime(maxElapsedTime),
Expand Down
8 changes: 4 additions & 4 deletions utils/backoff_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,28 +12,28 @@ import (
func TestWithMaxRetries(t *testing.T) {
t.Run("NotEnoughRetry", func(t *testing.T) {
retryable := newMockRetryableFn(3)
err := WithMaxRetries(
err := WithRetriesTimeout(
logging.NoLog{},
func() (err error) {
_, err = retryable.Run()
return err
},
// using default values: we want to run max 2 tries.
624*time.Millisecond,
logging.NoLog{},
)
require.Error(t, err)
})
t.Run("EnoughRetry", func(t *testing.T) {
retryable := newMockRetryableFn(2)
var res bool
err := WithMaxRetries(
err := WithRetriesTimeout(
logging.NoLog{},
func() (err error) {
res, err = retryable.Run()
return err
},
// using default values we want to run 3 tries.
2000*time.Millisecond,
logging.NoLog{},
)
require.NoError(t, err)
require.True(t, res)
Expand Down
2 changes: 1 addition & 1 deletion utils/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ var (
)

const (
DefaultRPCRetryTimeout = 5 * time.Second
DefaultRPCTimeout = 5 * time.Second
)

//
Expand Down
16 changes: 8 additions & 8 deletions vms/evm/subscriber.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ package evm

import (
"context"
"fmt"
"errors"
"math/big"
"time"

Expand All @@ -22,7 +22,7 @@ const (
// Max buffer size for ethereum subscription channels
maxClientSubscriptionBuffer = 20000
MaxBlocksPerRequest = 200
retryMaxElapsedTime = 5 * time.Second
resubscribeMaxElapsedTime = 5 * time.Second
)

// subscriber implements Subscriber
Expand Down Expand Up @@ -132,7 +132,7 @@ func (s *subscriber) getHeaderByNumberRetryable(headerNumber *big.Int) (*types.H
header, err = s.rpcClient.HeaderByNumber(context.Background(), headerNumber)
return err
}
err = utils.WithMaxRetries(operation, retryMaxElapsedTime, s.logger)
err = utils.WithRetriesTimeout(s.logger, operation, utils.DefaultRPCTimeout)
if err != nil {
s.logger.Error(
"Failed to get header by number",
Expand All @@ -145,28 +145,28 @@ func (s *subscriber) getHeaderByNumberRetryable(headerNumber *big.Int) (*types.H
}

// Loops forever iff maxResubscribeAttempts == 0
func (s *subscriber) Subscribe(maxResubscribeAttempts int) error {
func (s *subscriber) Subscribe(retryMaxElapsedTime time.Duration) error {
// Unsubscribe before resubscribing
// s.sub should only be nil on the first call to Subscribe
if s.sub != nil {
s.sub.Unsubscribe()
}

err := s.subscribe(uint64(maxResubscribeAttempts))
err := s.subscribe(retryMaxElapsedTime)
if err != nil {
return fmt.Errorf("failed to subscribe to node with all %d attempts", maxResubscribeAttempts)
return errors.New("failed to subscribe to node")
}
return nil
}

// subscribe until it succeeds or reached maxSubscribeAttempts.
func (s *subscriber) subscribe(maxSubscribeAttempts uint64) error {
func (s *subscriber) subscribe(retryMaxElapsedTime time.Duration) error {
var sub interfaces.Subscription
operation := func() (err error) {
sub, err = s.wsClient.SubscribeNewHead(context.Background(), s.headers)
return err
}
err := utils.WithMaxRetries(operation, retryMaxElapsedTime, s.logger)
err := utils.WithRetriesTimeout(s.logger, operation, retryMaxElapsedTime)
if err != nil {
s.logger.Error(
"Failed to subscribe to node",
Expand Down
3 changes: 2 additions & 1 deletion vms/subscriber.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package vms

import (
"math/big"
"time"

"github.com/ava-labs/avalanchego/ids"
"github.com/ava-labs/avalanchego/utils/logging"
Expand All @@ -25,7 +26,7 @@ type Subscriber interface {
// Subscribe registers a subscription. After Subscribe is called,
// log events that match [filter] are written to the channel returned
// by Logs
Subscribe(maxResubscribeAttempts int) error
Subscribe(retryMaxElapsedTime time.Duration) error

// Headers returns the channel that the subscription writes block headers to
Headers() <-chan *types.Header
Expand Down

0 comments on commit 89dc5b5

Please sign in to comment.