Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/main' into database-checkpoint…
Browse files Browse the repository at this point in the history
…-relayerid
  • Loading branch information
najeal committed Aug 9, 2024
2 parents 2ce8a63 + ea2ee6a commit 694a96e
Show file tree
Hide file tree
Showing 25 changed files with 1,791 additions and 548 deletions.
24 changes: 22 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -316,9 +316,29 @@ The relayer consists of the following components:

### Data Flow

<div align="center">
<figure>
<img src="resources/relayer-diagram.png?raw=true"></img>
</div>
<figcaption>Figure 1: Relayer Data Flow</figcaption>
</figure>

### Processing Missed Blocks

On startup, the relayer will process any blocks that it missed while offline if `process-missed-blocks` is set to `true` in the configuration. For each configured `source-blockchain`, the starting block height is set as the _minimum_ block height that is stored in the relayer's database across keys that pertain to that blockchain. These keys correspond to distinct sending addresses (as specified in `source-blockchain.allowed-origin-sender-addresses`), meaning that on startup, the relayer will begin processing from the _minimum_ block height across all configured sending addresses for each `source-blockchain`. Note that an empty `source-blockchain.allowed-origin-sender-addresses` list is treated as its own distinct key. If no keys are found, then the relayer begins processing from the current chain tip.

Once the starting block height is calculated, all blocks between it and the current tip of the `source-blockchain` are processed according to the _current_ configuration rules.

_Note:_ Given these semantics for computing the starting block height, it's possible for blocks that have previously been ignored under different configuration options to be relayed on a subsequent run. For example, consider the following scenario consisting of three subsequent runs (see Figure 2 below):

- **Run 1**: Suppose that on Blockchain A we set `allowed-origin-sender-addresses=[0x1234]`, meaning that the relayer should _ignore_ messages sent by any other address. The relayer processes through block `100`, and that height is marked as processed for `0x1234`'s key.

- **Run 2**: The relayer is then restarted with `allowed-origin-sender-addresses=[0xabcd]`, replacing `0x1234`. A message is sent from address `0x1234` at block height `200`. The relayer will decide to ignore this message, and will mark block `200` as processed for `0xabcd`'s key.

- **Run 3**: The relayer is then restarted again with the original configuration of `allowed-origin-sender-addresses=[0x1234]`. The relayer will calculate the starting block height as `100`, and process blocks `100` through the current chain tip, reprocessing block `200` along the way. Instead of ignoring the message in this block, however, the relayer will relay it to the destination.

<figure>
<img src="resources/catch-up-example.png?raw=true"></img>
<figcaption>Figure 2: Processing Missed Blocks Example</figcaption>
</figure>

### API

Expand Down
10 changes: 10 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -246,3 +246,13 @@ func (c *Config) GetWarpQuorum(blockchainID ids.ID) (WarpQuorum, error) {
}
return WarpQuorum{}, errFailedToGetWarpQuorum
}

// Config implements the peers.Config interface
func (c *Config) GetPChainAPI() *APIConfig {
return c.PChainAPI
}

// Config implements the peers.Config interface
func (c *Config) GetInfoAPI() *APIConfig {
return c.InfoAPI
}
34 changes: 29 additions & 5 deletions main/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/ava-labs/avalanchego/message"
"github.com/ava-labs/avalanchego/utils/constants"
"github.com/ava-labs/avalanchego/utils/logging"
"github.com/ava-labs/avalanchego/utils/set"
"github.com/ava-labs/awm-relayer/api"
"github.com/ava-labs/awm-relayer/config"
"github.com/ava-labs/awm-relayer/database"
Expand All @@ -26,6 +27,8 @@ import (
"github.com/ava-labs/awm-relayer/peers"
"github.com/ava-labs/awm-relayer/relayer"
"github.com/ava-labs/awm-relayer/relayer/checkpoint"
"github.com/ava-labs/awm-relayer/signature-aggregator/aggregator"
sigAggMetrics "github.com/ava-labs/awm-relayer/signature-aggregator/metrics"
"github.com/ava-labs/awm-relayer/utils"
"github.com/ava-labs/awm-relayer/vms"
"github.com/ava-labs/subnet-evm/ethclient"
Expand Down Expand Up @@ -134,11 +137,23 @@ func main() {
if logLevel <= logging.Debug {
networkLogLevel = logLevel
}
var trackedSubnets set.Set[ids.ID]
// trackedSubnets is no longer strictly required but keeping it here for now
// to keep full parity with existing AWM relayer for now
// TODO: remove this from here once trackedSubnets are no longer referenced
// by ping messages in avalanchego
for _, sourceBlockchain := range cfg.SourceBlockchains {
trackedSubnets.Add(sourceBlockchain.GetSubnetID())
}

network, err := peers.NewNetwork(
networkLogLevel,
prometheus.DefaultRegisterer,
trackedSubnets,
&cfg,
)
network.InitializeConnectionsAndCheckStake(&cfg)

if err != nil {
logger.Fatal("Failed to create app request network", zap.Error(err))
panic(err)
Expand Down Expand Up @@ -198,17 +213,26 @@ func main() {
panic(err)
}

signatureAggregator := aggregator.NewSignatureAggregator(
network,
logger,
sigAggMetrics.NewSignatureAggregatorMetrics(
prometheus.DefaultRegisterer,
),
messageCreator,
)

applicationRelayers, minHeights, err := createApplicationRelayers(
context.Background(),
logger,
relayerMetrics,
db,
ticker,
network,
messageCreator,
&cfg,
sourceClients,
destinationClients,
signatureAggregator,
)
if err != nil {
logger.Fatal("Failed to create application relayers", zap.Error(err))
Expand Down Expand Up @@ -333,10 +357,10 @@ func createApplicationRelayers(
db checkpoint.RelayerDatabase,
ticker *utils.Ticker,
network *peers.AppRequestNetwork,
messageCreator message.Creator,
cfg *config.Config,
sourceClients map[ids.ID]ethclient.Client,
destinationClients map[ids.ID]vms.DestinationClient,
signatureAggregator *aggregator.SignatureAggregator,
) (map[common.Hash]*relayer.ApplicationRelayer, map[ids.ID]uint64, error) {
applicationRelayers := make(map[common.Hash]*relayer.ApplicationRelayer)
minHeights := make(map[ids.ID]uint64)
Expand All @@ -356,10 +380,10 @@ func createApplicationRelayers(
ticker,
*sourceBlockchain,
network,
messageCreator,
cfg,
currentHeight,
destinationClients,
signatureAggregator,
)
if err != nil {
logger.Error(
Expand Down Expand Up @@ -392,10 +416,10 @@ func createApplicationRelayersForSourceChain(
ticker *utils.Ticker,
sourceBlockchain config.SourceBlockchain,
network *peers.AppRequestNetwork,
messageCreator message.Creator,
cfg *config.Config,
currentHeight uint64,
destinationClients map[ids.ID]vms.DestinationClient,
signatureAggregator *aggregator.SignatureAggregator,
) (map[common.Hash]*relayer.ApplicationRelayer, uint64, error) {
// Create the ApplicationRelayers
logger.Info(
Expand Down Expand Up @@ -439,12 +463,12 @@ func createApplicationRelayersForSourceChain(
logger,
metrics,
network,
messageCreator,
relayerID,
destinationClients[relayerID.DestinationBlockchainID],
sourceBlockchain,
checkpointManager,
cfg,
signatureAggregator,
)
if err != nil {
logger.Error(
Expand Down
78 changes: 53 additions & 25 deletions peers/app_request_network.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,18 @@ package peers

import (
"context"
"fmt"
"math/big"
"os"
"sync"
"time"

"github.com/ava-labs/avalanchego/ids"
"github.com/ava-labs/avalanchego/message"
"github.com/ava-labs/avalanchego/network"
avagoCommon "github.com/ava-labs/avalanchego/snow/engine/common"
snowVdrs "github.com/ava-labs/avalanchego/snow/validators"
"github.com/ava-labs/avalanchego/subnets"
"github.com/ava-labs/avalanchego/utils/constants"
"github.com/ava-labs/avalanchego/utils/logging"
"github.com/ava-labs/avalanchego/utils/set"
Expand All @@ -30,8 +34,8 @@ const (
)

type AppRequestNetwork struct {
Network network.Network
Handler *RelayerExternalHandler
network network.Network
handler *RelayerExternalHandler
infoAPI *InfoAPI
logger logging.Logger
lock *sync.Mutex
Expand All @@ -42,7 +46,8 @@ type AppRequestNetwork struct {
func NewNetwork(
logLevel logging.Level,
registerer prometheus.Registerer,
cfg *config.Config,
trackedSubnets set.Set[ids.ID],
cfg Config,
) (*AppRequestNetwork, error) {
logger := logging.NewLogger(
"awm-relayer-p2p",
Expand All @@ -63,7 +68,7 @@ func NewNetwork(
return nil, err
}

infoAPI, err := NewInfoAPI(cfg.InfoAPI)
infoAPI, err := NewInfoAPI(cfg.GetInfoAPI())
if err != nil {
logger.Error(
"Failed to create info API",
Expand All @@ -80,11 +85,6 @@ func NewNetwork(
return nil, err
}

var trackedSubnets set.Set[ids.ID]
for _, sourceBlockchain := range cfg.SourceBlockchains {
trackedSubnets.Add(sourceBlockchain.GetSubnetID())
}

testNetwork, err := network.NewTestNetwork(logger, networkID, snowVdrs.NewManager(), trackedSubnets, handler)
if err != nil {
logger.Error(
Expand All @@ -94,38 +94,47 @@ func NewNetwork(
return nil, err
}

validatorClient := validators.NewCanonicalValidatorClient(logger, cfg.PChainAPI)
validatorClient := validators.NewCanonicalValidatorClient(logger, cfg.GetPChainAPI())

arNetwork := &AppRequestNetwork{
Network: testNetwork,
Handler: handler,
network: testNetwork,
handler: handler,
infoAPI: infoAPI,
logger: logger,
lock: new(sync.Mutex),
validatorClient: validatorClient,
}
go logger.RecoverAndPanic(func() {
testNetwork.Dispatch()
})

return arNetwork, nil
}

// TODO: remove dependence on Relayer specific config since this is meant to be a generic AppRequestNetwork file
func (n *AppRequestNetwork) InitializeConnectionsAndCheckStake(cfg *config.Config) error {
// Manually connect to the validators of each of the source subnets.
// We return an error if we are unable to connect to sufficient stake on any of the subnets.
// Sufficient stake is determined by the Warp quora of the configured supported destinations,
// or if the subnet supports all destinations, by the quora of all configured destinations.
for _, sourceBlockchain := range cfg.SourceBlockchains {
if sourceBlockchain.GetSubnetID() == constants.PrimaryNetworkID {
if err := arNetwork.connectToPrimaryNetworkPeers(cfg, sourceBlockchain); err != nil {
return nil, err
if err := n.connectToPrimaryNetworkPeers(cfg, sourceBlockchain); err != nil {
return fmt.Errorf(
"failed to connect to primary network peers: %w",
err,
)
}
} else {
if err := arNetwork.connectToNonPrimaryNetworkPeers(cfg, sourceBlockchain); err != nil {
return nil, err
if err := n.connectToNonPrimaryNetworkPeers(cfg, sourceBlockchain); err != nil {
return fmt.Errorf(
"failed to connect to non-primary network peers: %w",
err,
)
}
}
}

go logger.RecoverAndPanic(func() {
testNetwork.Dispatch()
})

return arNetwork, nil
return nil
}

// ConnectPeers connects the network to peers with the given nodeIDs.
Expand All @@ -135,7 +144,7 @@ func (n *AppRequestNetwork) ConnectPeers(nodeIDs set.Set[ids.NodeID]) set.Set[id
defer n.lock.Unlock()

// First, check if we are already connected to all the peers
connectedPeers := n.Network.PeerInfo(nodeIDs.List())
connectedPeers := n.network.PeerInfo(nodeIDs.List())
if len(connectedPeers) == nodeIDs.Len() {
return nodeIDs
}
Expand All @@ -160,7 +169,7 @@ func (n *AppRequestNetwork) ConnectPeers(nodeIDs set.Set[ids.NodeID]) set.Set[id
for _, peer := range peers {
if nodeIDs.Contains(peer.ID) {
trackedNodes.Add(peer.ID)
n.Network.ManuallyTrack(peer.ID, peer.PublicIP)
n.network.ManuallyTrack(peer.ID, peer.PublicIP)
if len(trackedNodes) == nodeIDs.Len() {
return trackedNodes
}
Expand All @@ -182,7 +191,7 @@ func (n *AppRequestNetwork) ConnectPeers(nodeIDs set.Set[ids.NodeID]) set.Set[id
)
} else {
trackedNodes.Add(apiNodeID)
n.Network.ManuallyTrack(apiNodeID, apiNodeIPPort)
n.network.ManuallyTrack(apiNodeID, apiNodeIPPort)
}
}

Expand Down Expand Up @@ -245,6 +254,25 @@ func (n *AppRequestNetwork) ConnectToCanonicalValidators(subnetID ids.ID) (*Conn
}, nil
}

func (n *AppRequestNetwork) Send(
msg message.OutboundMessage,
nodeIDs set.Set[ids.NodeID],
subnetID ids.ID,
allower subnets.Allower,
) set.Set[ids.NodeID] {
return n.network.Send(msg, avagoCommon.SendConfig{NodeIDs: nodeIDs}, subnetID, allower)
}

func (n *AppRequestNetwork) RegisterAppRequest(requestID ids.RequestID) {
n.handler.RegisterAppRequest(requestID)
}
func (n *AppRequestNetwork) RegisterRequestID(requestID uint32, numExpectedResponse int) chan message.InboundMessage {
return n.handler.RegisterRequestID(requestID, numExpectedResponse)
}
func (n *AppRequestNetwork) GetSubnetID(blockchainID ids.ID) (ids.ID, error) {
return n.validatorClient.GetSubnetID(context.Background(), blockchainID)
}

// Private helpers

// Connect to the validators of the source blockchain. For each destination blockchain,
Expand Down
12 changes: 12 additions & 0 deletions peers/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
// Copyright (C) 2024, Ava Labs, Inc. All rights reserved.
// See the file LICENSE for licensing terms.

package peers

import "github.com/ava-labs/awm-relayer/config"

// Config defines a common interface necessary for standing up an AppRequestNetwork.
type Config interface {
GetInfoAPI() *config.APIConfig
GetPChainAPI() *config.APIConfig
}
Loading

0 comments on commit 694a96e

Please sign in to comment.