diff --git a/README.md b/README.md
index c10e44a2..5b72944c 100644
--- a/README.md
+++ b/README.md
@@ -316,9 +316,29 @@ The relayer consists of the following components:
### Data Flow
-
+
+ Figure 1: Relayer Data Flow
+
+
+### Processing Missed Blocks
+
+On startup, the relayer will process any blocks that it missed while offline if `process-missed-blocks` is set to `true` in the configuration. For each configured `source-blockchain`, the starting block height is set as the _minimum_ block height that is stored in the relayer's database across keys that pertain to that blockchain. These keys correspond to distinct sending addresses (as specified in `source-blockchain.allowed-origin-sender-addresses`), meaning that on startup, the relayer will begin processing from the _minimum_ block height across all configured sending addresses for each `source-blockchain`. Note that an empty `source-blockchain.allowed-origin-sender-addresses` list is treated as its own distinct key. If no keys are found, then the relayer begins processing from the current chain tip.
+
+Once the starting block height is calculated, all blocks between it and the current tip of the `source-blockchain` are processed according to the _current_ configuration rules.
+
+_Note:_ Given these semantics for computing the starting block height, it's possible for blocks that have previously been ignored under different configuration options to be relayed on a subsequent run. For example, consider the following scenario consisting of three subsequent runs (see Figure 2 below):
+
+- **Run 1**: Suppose that on Blockchain A we set `allowed-origin-sender-addresses=[0x1234]`, meaning that the relayer should _ignore_ messages sent by any other address. The relayer processes through block `100`, and that height is marked as processed for `0x1234`'s key.
+
+- **Run 2**: The relayer is then restarted with `allowed-origin-sender-addresses=[0xabcd]`, replacing `0x1234`. A message is sent from address `0x1234` at block height `200`. The relayer will decide to ignore this message, and will mark block `200` as processed for `0xabcd`'s key.
+
+- **Run 3**: The relayer is then restarted again with the original configuration of `allowed-origin-sender-addresses=[0x1234]`. The relayer will calculate the starting block height as `100`, and process blocks `100` through the current chain tip, reprocessing block `200` along the way. Instead of ignoring the message in this block, however, the relayer will relay it to the destination.
+
+
### API
diff --git a/config/config.go b/config/config.go
index b4b54f11..231c0795 100644
--- a/config/config.go
+++ b/config/config.go
@@ -246,3 +246,13 @@ func (c *Config) GetWarpQuorum(blockchainID ids.ID) (WarpQuorum, error) {
}
return WarpQuorum{}, errFailedToGetWarpQuorum
}
+
+// Config implements the peers.Config interface
+func (c *Config) GetPChainAPI() *APIConfig {
+ return c.PChainAPI
+}
+
+// Config implements the peers.Config interface
+func (c *Config) GetInfoAPI() *APIConfig {
+ return c.InfoAPI
+}
diff --git a/main/main.go b/main/main.go
index 48494337..a6452742 100644
--- a/main/main.go
+++ b/main/main.go
@@ -17,6 +17,7 @@ import (
"github.com/ava-labs/avalanchego/message"
"github.com/ava-labs/avalanchego/utils/constants"
"github.com/ava-labs/avalanchego/utils/logging"
+ "github.com/ava-labs/avalanchego/utils/set"
"github.com/ava-labs/awm-relayer/api"
"github.com/ava-labs/awm-relayer/config"
"github.com/ava-labs/awm-relayer/database"
@@ -26,6 +27,8 @@ import (
"github.com/ava-labs/awm-relayer/peers"
"github.com/ava-labs/awm-relayer/relayer"
"github.com/ava-labs/awm-relayer/relayer/checkpoint"
+ "github.com/ava-labs/awm-relayer/signature-aggregator/aggregator"
+ sigAggMetrics "github.com/ava-labs/awm-relayer/signature-aggregator/metrics"
"github.com/ava-labs/awm-relayer/utils"
"github.com/ava-labs/awm-relayer/vms"
"github.com/ava-labs/subnet-evm/ethclient"
@@ -134,11 +137,23 @@ func main() {
if logLevel <= logging.Debug {
networkLogLevel = logLevel
}
+ var trackedSubnets set.Set[ids.ID]
+ // trackedSubnets is no longer strictly required but keeping it here for now
+ // to keep full parity with existing AWM relayer for now
+ // TODO: remove this from here once trackedSubnets are no longer referenced
+ // by ping messages in avalanchego
+ for _, sourceBlockchain := range cfg.SourceBlockchains {
+ trackedSubnets.Add(sourceBlockchain.GetSubnetID())
+ }
+
network, err := peers.NewNetwork(
networkLogLevel,
prometheus.DefaultRegisterer,
+ trackedSubnets,
&cfg,
)
+ network.InitializeConnectionsAndCheckStake(&cfg)
+
if err != nil {
logger.Fatal("Failed to create app request network", zap.Error(err))
panic(err)
@@ -198,6 +213,15 @@ func main() {
panic(err)
}
+ signatureAggregator := aggregator.NewSignatureAggregator(
+ network,
+ logger,
+ sigAggMetrics.NewSignatureAggregatorMetrics(
+ prometheus.DefaultRegisterer,
+ ),
+ messageCreator,
+ )
+
applicationRelayers, minHeights, err := createApplicationRelayers(
context.Background(),
logger,
@@ -205,10 +229,10 @@ func main() {
db,
ticker,
network,
- messageCreator,
&cfg,
sourceClients,
destinationClients,
+ signatureAggregator,
)
if err != nil {
logger.Fatal("Failed to create application relayers", zap.Error(err))
@@ -333,10 +357,10 @@ func createApplicationRelayers(
db checkpoint.RelayerDatabase,
ticker *utils.Ticker,
network *peers.AppRequestNetwork,
- messageCreator message.Creator,
cfg *config.Config,
sourceClients map[ids.ID]ethclient.Client,
destinationClients map[ids.ID]vms.DestinationClient,
+ signatureAggregator *aggregator.SignatureAggregator,
) (map[common.Hash]*relayer.ApplicationRelayer, map[ids.ID]uint64, error) {
applicationRelayers := make(map[common.Hash]*relayer.ApplicationRelayer)
minHeights := make(map[ids.ID]uint64)
@@ -356,10 +380,10 @@ func createApplicationRelayers(
ticker,
*sourceBlockchain,
network,
- messageCreator,
cfg,
currentHeight,
destinationClients,
+ signatureAggregator,
)
if err != nil {
logger.Error(
@@ -392,10 +416,10 @@ func createApplicationRelayersForSourceChain(
ticker *utils.Ticker,
sourceBlockchain config.SourceBlockchain,
network *peers.AppRequestNetwork,
- messageCreator message.Creator,
cfg *config.Config,
currentHeight uint64,
destinationClients map[ids.ID]vms.DestinationClient,
+ signatureAggregator *aggregator.SignatureAggregator,
) (map[common.Hash]*relayer.ApplicationRelayer, uint64, error) {
// Create the ApplicationRelayers
logger.Info(
@@ -439,12 +463,12 @@ func createApplicationRelayersForSourceChain(
logger,
metrics,
network,
- messageCreator,
relayerID,
destinationClients[relayerID.DestinationBlockchainID],
sourceBlockchain,
checkpointManager,
cfg,
+ signatureAggregator,
)
if err != nil {
logger.Error(
diff --git a/peers/app_request_network.go b/peers/app_request_network.go
index 4aba983d..651d0adb 100644
--- a/peers/app_request_network.go
+++ b/peers/app_request_network.go
@@ -5,14 +5,18 @@ package peers
import (
"context"
+ "fmt"
"math/big"
"os"
"sync"
"time"
"github.com/ava-labs/avalanchego/ids"
+ "github.com/ava-labs/avalanchego/message"
"github.com/ava-labs/avalanchego/network"
+ avagoCommon "github.com/ava-labs/avalanchego/snow/engine/common"
snowVdrs "github.com/ava-labs/avalanchego/snow/validators"
+ "github.com/ava-labs/avalanchego/subnets"
"github.com/ava-labs/avalanchego/utils/constants"
"github.com/ava-labs/avalanchego/utils/logging"
"github.com/ava-labs/avalanchego/utils/set"
@@ -30,8 +34,8 @@ const (
)
type AppRequestNetwork struct {
- Network network.Network
- Handler *RelayerExternalHandler
+ network network.Network
+ handler *RelayerExternalHandler
infoAPI *InfoAPI
logger logging.Logger
lock *sync.Mutex
@@ -42,7 +46,8 @@ type AppRequestNetwork struct {
func NewNetwork(
logLevel logging.Level,
registerer prometheus.Registerer,
- cfg *config.Config,
+ trackedSubnets set.Set[ids.ID],
+ cfg Config,
) (*AppRequestNetwork, error) {
logger := logging.NewLogger(
"awm-relayer-p2p",
@@ -63,7 +68,7 @@ func NewNetwork(
return nil, err
}
- infoAPI, err := NewInfoAPI(cfg.InfoAPI)
+ infoAPI, err := NewInfoAPI(cfg.GetInfoAPI())
if err != nil {
logger.Error(
"Failed to create info API",
@@ -80,11 +85,6 @@ func NewNetwork(
return nil, err
}
- var trackedSubnets set.Set[ids.ID]
- for _, sourceBlockchain := range cfg.SourceBlockchains {
- trackedSubnets.Add(sourceBlockchain.GetSubnetID())
- }
-
testNetwork, err := network.NewTestNetwork(logger, networkID, snowVdrs.NewManager(), trackedSubnets, handler)
if err != nil {
logger.Error(
@@ -94,38 +94,47 @@ func NewNetwork(
return nil, err
}
- validatorClient := validators.NewCanonicalValidatorClient(logger, cfg.PChainAPI)
+ validatorClient := validators.NewCanonicalValidatorClient(logger, cfg.GetPChainAPI())
arNetwork := &AppRequestNetwork{
- Network: testNetwork,
- Handler: handler,
+ network: testNetwork,
+ handler: handler,
infoAPI: infoAPI,
logger: logger,
lock: new(sync.Mutex),
validatorClient: validatorClient,
}
+ go logger.RecoverAndPanic(func() {
+ testNetwork.Dispatch()
+ })
+
+ return arNetwork, nil
+}
+// TODO: remove dependence on Relayer specific config since this is meant to be a generic AppRequestNetwork file
+func (n *AppRequestNetwork) InitializeConnectionsAndCheckStake(cfg *config.Config) error {
// Manually connect to the validators of each of the source subnets.
// We return an error if we are unable to connect to sufficient stake on any of the subnets.
// Sufficient stake is determined by the Warp quora of the configured supported destinations,
// or if the subnet supports all destinations, by the quora of all configured destinations.
for _, sourceBlockchain := range cfg.SourceBlockchains {
if sourceBlockchain.GetSubnetID() == constants.PrimaryNetworkID {
- if err := arNetwork.connectToPrimaryNetworkPeers(cfg, sourceBlockchain); err != nil {
- return nil, err
+ if err := n.connectToPrimaryNetworkPeers(cfg, sourceBlockchain); err != nil {
+ return fmt.Errorf(
+ "failed to connect to primary network peers: %w",
+ err,
+ )
}
} else {
- if err := arNetwork.connectToNonPrimaryNetworkPeers(cfg, sourceBlockchain); err != nil {
- return nil, err
+ if err := n.connectToNonPrimaryNetworkPeers(cfg, sourceBlockchain); err != nil {
+ return fmt.Errorf(
+ "failed to connect to non-primary network peers: %w",
+ err,
+ )
}
}
}
-
- go logger.RecoverAndPanic(func() {
- testNetwork.Dispatch()
- })
-
- return arNetwork, nil
+ return nil
}
// ConnectPeers connects the network to peers with the given nodeIDs.
@@ -135,7 +144,7 @@ func (n *AppRequestNetwork) ConnectPeers(nodeIDs set.Set[ids.NodeID]) set.Set[id
defer n.lock.Unlock()
// First, check if we are already connected to all the peers
- connectedPeers := n.Network.PeerInfo(nodeIDs.List())
+ connectedPeers := n.network.PeerInfo(nodeIDs.List())
if len(connectedPeers) == nodeIDs.Len() {
return nodeIDs
}
@@ -160,7 +169,7 @@ func (n *AppRequestNetwork) ConnectPeers(nodeIDs set.Set[ids.NodeID]) set.Set[id
for _, peer := range peers {
if nodeIDs.Contains(peer.ID) {
trackedNodes.Add(peer.ID)
- n.Network.ManuallyTrack(peer.ID, peer.PublicIP)
+ n.network.ManuallyTrack(peer.ID, peer.PublicIP)
if len(trackedNodes) == nodeIDs.Len() {
return trackedNodes
}
@@ -182,7 +191,7 @@ func (n *AppRequestNetwork) ConnectPeers(nodeIDs set.Set[ids.NodeID]) set.Set[id
)
} else {
trackedNodes.Add(apiNodeID)
- n.Network.ManuallyTrack(apiNodeID, apiNodeIPPort)
+ n.network.ManuallyTrack(apiNodeID, apiNodeIPPort)
}
}
@@ -245,6 +254,25 @@ func (n *AppRequestNetwork) ConnectToCanonicalValidators(subnetID ids.ID) (*Conn
}, nil
}
+func (n *AppRequestNetwork) Send(
+ msg message.OutboundMessage,
+ nodeIDs set.Set[ids.NodeID],
+ subnetID ids.ID,
+ allower subnets.Allower,
+) set.Set[ids.NodeID] {
+ return n.network.Send(msg, avagoCommon.SendConfig{NodeIDs: nodeIDs}, subnetID, allower)
+}
+
+func (n *AppRequestNetwork) RegisterAppRequest(requestID ids.RequestID) {
+ n.handler.RegisterAppRequest(requestID)
+}
+func (n *AppRequestNetwork) RegisterRequestID(requestID uint32, numExpectedResponse int) chan message.InboundMessage {
+ return n.handler.RegisterRequestID(requestID, numExpectedResponse)
+}
+func (n *AppRequestNetwork) GetSubnetID(blockchainID ids.ID) (ids.ID, error) {
+ return n.validatorClient.GetSubnetID(context.Background(), blockchainID)
+}
+
// Private helpers
// Connect to the validators of the source blockchain. For each destination blockchain,
diff --git a/peers/config.go b/peers/config.go
new file mode 100644
index 00000000..4749d18a
--- /dev/null
+++ b/peers/config.go
@@ -0,0 +1,12 @@
+// Copyright (C) 2024, Ava Labs, Inc. All rights reserved.
+// See the file LICENSE for licensing terms.
+
+package peers
+
+import "github.com/ava-labs/awm-relayer/config"
+
+// Config defines a common interface necessary for standing up an AppRequestNetwork.
+type Config interface {
+ GetInfoAPI() *config.APIConfig
+ GetPChainAPI() *config.APIConfig
+}
diff --git a/relayer/application_relayer.go b/relayer/application_relayer.go
index 7412d7c7..9504672d 100644
--- a/relayer/application_relayer.go
+++ b/relayer/application_relayer.go
@@ -4,31 +4,20 @@
package relayer
import (
- "bytes"
"context"
"errors"
- "math/big"
- "math/rand"
- "sync"
"time"
"github.com/ava-labs/avalanchego/ids"
- "github.com/ava-labs/avalanchego/message"
- "github.com/ava-labs/avalanchego/proto/pb/p2p"
- avagoCommon "github.com/ava-labs/avalanchego/snow/engine/common"
- "github.com/ava-labs/avalanchego/subnets"
"github.com/ava-labs/avalanchego/utils/constants"
- "github.com/ava-labs/avalanchego/utils/crypto/bls"
"github.com/ava-labs/avalanchego/utils/logging"
- "github.com/ava-labs/avalanchego/utils/set"
avalancheWarp "github.com/ava-labs/avalanchego/vms/platformvm/warp"
"github.com/ava-labs/awm-relayer/config"
"github.com/ava-labs/awm-relayer/messages"
"github.com/ava-labs/awm-relayer/peers"
+ "github.com/ava-labs/awm-relayer/signature-aggregator/aggregator"
"github.com/ava-labs/awm-relayer/utils"
"github.com/ava-labs/awm-relayer/vms"
- coreEthMsg "github.com/ava-labs/coreth/plugin/evm/message"
- msg "github.com/ava-labs/subnet-evm/plugin/evm/message"
"github.com/ava-labs/subnet-evm/rpc"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/hexutil"
@@ -37,8 +26,6 @@ import (
"go.uber.org/zap"
)
-type blsSignatureBuf [bls.SignatureLen]byte
-
const (
// Number of retries to collect signatures from validators
maxRelayerQueryAttempts = 5
@@ -47,14 +34,8 @@ const (
signatureRequestRetryWaitPeriodMs = 10_000
)
-var (
- codec = msg.Codec
- coreEthCodec = coreEthMsg.Codec
- // Errors
- errNotEnoughSignatures = errors.New("failed to collect a threshold of signatures")
- errFailedToGetAggSig = errors.New("failed to get aggregate signature from node endpoint")
- errNotEnoughConnectedStake = errors.New("failed to connect to a threshold of stake")
-)
+// Errors
+var errFailedToGetAggSig = errors.New("failed to get aggregate signature from node endpoint")
// CheckpointManager stores committed heights in the database
type CheckpointManager interface {
@@ -74,28 +55,26 @@ type ApplicationRelayer struct {
logger logging.Logger
metrics *ApplicationRelayerMetrics
network *peers.AppRequestNetwork
- messageCreator message.Creator
sourceBlockchain config.SourceBlockchain
signingSubnetID ids.ID
destinationClient vms.DestinationClient
relayerID RelayerID
warpQuorum config.WarpQuorum
checkpointManager CheckpointManager
- currentRequestID uint32
- lock *sync.RWMutex
sourceWarpSignatureClient *rpc.Client // nil if configured to fetch signatures via AppRequest for the source blockchain
+ signatureAggregator *aggregator.SignatureAggregator
}
func NewApplicationRelayer(
logger logging.Logger,
metrics *ApplicationRelayerMetrics,
network *peers.AppRequestNetwork,
- messageCreator message.Creator,
relayerID RelayerID,
destinationClient vms.DestinationClient,
sourceBlockchain config.SourceBlockchain,
checkpointManager CheckpointManager,
cfg *config.Config,
+ signatureAggregator *aggregator.SignatureAggregator,
) (*ApplicationRelayer, error) {
quorum, err := cfg.GetWarpQuorum(relayerID.DestinationBlockchainID)
if err != nil {
@@ -142,16 +121,14 @@ func NewApplicationRelayer(
logger: logger,
metrics: metrics,
network: network,
- messageCreator: messageCreator,
sourceBlockchain: sourceBlockchain,
destinationClient: destinationClient,
relayerID: relayerID,
signingSubnetID: signingSubnet,
warpQuorum: quorum,
checkpointManager: checkpointManager,
- currentRequestID: rand.Uint32(), // TODONOW: pass via ctor
- lock: &sync.RWMutex{},
sourceWarpSignatureClient: warpClient,
+ signatureAggregator: signatureAggregator,
}
return &ar, nil
@@ -199,27 +176,8 @@ func (r *ApplicationRelayer) ProcessHeight(
// Relays a message to the destination chain. Does not checkpoint the height.
// returns the transaction hash if the message is successfully relayed.
func (r *ApplicationRelayer) ProcessMessage(handler messages.MessageHandler) (common.Hash, error) {
- // Increment the request ID. Make sure we don't hold the lock while we relay the message.
- r.lock.Lock()
- r.currentRequestID++
- reqID := r.currentRequestID
- r.lock.Unlock()
-
- return r.relayMessage(reqID, handler)
-}
-
-func (r *ApplicationRelayer) RelayerID() RelayerID {
- return r.relayerID
-}
-
-// returns the transaction hash if the message is successfully relayed.
-func (r *ApplicationRelayer) relayMessage(
- requestID uint32,
- handler messages.MessageHandler,
-) (common.Hash, error) {
r.logger.Debug(
"Relaying message",
- zap.Uint32("requestID", requestID),
zap.String("sourceBlockchainID", r.sourceBlockchain.BlockchainID),
zap.String("relayerID", r.relayerID.ID.String()),
)
@@ -244,8 +202,12 @@ func (r *ApplicationRelayer) relayMessage(
// sourceWarpSignatureClient is nil iff the source blockchain is configured to fetch signatures via AppRequest
if r.sourceWarpSignatureClient == nil {
+ signedMessage, err = r.signatureAggregator.CreateSignedMessage(
+ unsignedMessage,
+ r.signingSubnetID,
+ r.warpQuorum.QuorumNumerator,
+ )
r.incFetchSignatureAppRequestCount()
- signedMessage, err = r.createSignedMessageAppRequest(unsignedMessage, requestID)
if err != nil {
r.logger.Error(
"Failed to create signed warp message via AppRequest network",
@@ -289,6 +251,10 @@ func (r *ApplicationRelayer) relayMessage(
return txHash, nil
}
+func (r *ApplicationRelayer) RelayerID() RelayerID {
+ return r.relayerID
+}
+
// createSignedMessage fetches the signed Warp message from the source chain via RPC.
// Each VM may implement their own RPC method to construct the aggregate signature, which
// will need to be accounted for here.
@@ -350,417 +316,6 @@ func (r *ApplicationRelayer) createSignedMessage(
return nil, errFailedToGetAggSig
}
-// createSignedMessageAppRequest collects signatures from nodes by directly querying them
-// via AppRequest, then aggregates the signatures, and constructs the signed warp message.
-func (r *ApplicationRelayer) createSignedMessageAppRequest(
- unsignedMessage *avalancheWarp.UnsignedMessage,
- requestID uint32,
-) (*avalancheWarp.Message, error) {
- r.logger.Info(
- "Fetching aggregate signature from the source chain validators via AppRequest",
- zap.String("warpMessageID", unsignedMessage.ID().String()),
- zap.String("sourceBlockchainID", r.sourceBlockchain.GetBlockchainID().String()),
- zap.String("destinationBlockchainID", r.relayerID.DestinationBlockchainID.String()),
- )
- connectedValidators, err := r.network.ConnectToCanonicalValidators(r.signingSubnetID)
- if err != nil {
- r.logger.Error(
- "Failed to connect to canonical validators",
- zap.String("warpMessageID", unsignedMessage.ID().String()),
- zap.Error(err),
- )
- return nil, err
- }
- if !utils.CheckStakeWeightExceedsThreshold(
- big.NewInt(0).SetUint64(connectedValidators.ConnectedWeight),
- connectedValidators.TotalValidatorWeight,
- r.warpQuorum.QuorumNumerator,
- r.warpQuorum.QuorumDenominator,
- ) {
- r.logger.Error(
- "Failed to connect to a threshold of stake",
- zap.Uint64("connectedWeight", connectedValidators.ConnectedWeight),
- zap.Uint64("totalValidatorWeight", connectedValidators.TotalValidatorWeight),
- zap.Any("warpQuorum", r.warpQuorum),
- )
- return nil, errNotEnoughConnectedStake
- }
-
- // Make sure to use the correct codec
- var reqBytes []byte
- if r.sourceBlockchain.GetSubnetID() == constants.PrimaryNetworkID {
- req := coreEthMsg.MessageSignatureRequest{
- MessageID: unsignedMessage.ID(),
- }
- reqBytes, err = coreEthMsg.RequestToBytes(coreEthCodec, req)
- } else {
- req := msg.MessageSignatureRequest{
- MessageID: unsignedMessage.ID(),
- }
- reqBytes, err = msg.RequestToBytes(codec, req)
- }
- if err != nil {
- r.logger.Error(
- "Failed to marshal request bytes",
- zap.String("warpMessageID", unsignedMessage.ID().String()),
- zap.Error(err),
- )
- return nil, err
- }
-
- // Construct the AppRequest
- outMsg, err := r.messageCreator.AppRequest(
- unsignedMessage.SourceChainID,
- requestID,
- peers.DefaultAppRequestTimeout,
- reqBytes,
- )
- if err != nil {
- r.logger.Error(
- "Failed to create app request message",
- zap.String("warpMessageID", unsignedMessage.ID().String()),
- zap.Error(err),
- )
- return nil, err
- }
-
- // Query the validators with retries. On each retry, query one node per unique BLS pubkey
- accumulatedSignatureWeight := big.NewInt(0)
-
- signatureMap := make(map[int]blsSignatureBuf)
- for attempt := 1; attempt <= maxRelayerQueryAttempts; attempt++ {
- responsesExpected := len(connectedValidators.ValidatorSet) - len(signatureMap)
- r.logger.Debug(
- "Relayer collecting signatures from peers.",
- zap.Int("attempt", attempt),
- zap.String("sourceBlockchainID", r.sourceBlockchain.GetBlockchainID().String()),
- zap.String("destinationBlockchainID", r.relayerID.DestinationBlockchainID.String()),
- zap.Int("validatorSetSize", len(connectedValidators.ValidatorSet)),
- zap.Int("signatureMapSize", len(signatureMap)),
- zap.Int("responsesExpected", responsesExpected),
- )
-
- vdrSet := set.NewSet[ids.NodeID](len(connectedValidators.ValidatorSet))
- for i, vdr := range connectedValidators.ValidatorSet {
- // If we already have the signature for this validator, do not query any of the composite nodes again
- if _, ok := signatureMap[i]; ok {
- continue
- }
-
- // TODO: Track failures and iterate through the validator's node list on subsequent query attempts
- nodeID := vdr.NodeIDs[0]
- vdrSet.Add(nodeID)
- r.logger.Debug(
- "Added node ID to query.",
- zap.String("nodeID", nodeID.String()),
- zap.String("warpMessageID", unsignedMessage.ID().String()),
- zap.String("destinationBlockchainID", r.relayerID.DestinationBlockchainID.String()),
- zap.String("sourceBlockchainID", r.sourceBlockchain.GetBlockchainID().String()),
- )
-
- // Register a timeout response for each queried node
- reqID := ids.RequestID{
- NodeID: nodeID,
- SourceChainID: unsignedMessage.SourceChainID,
- DestinationChainID: unsignedMessage.SourceChainID,
- RequestID: requestID,
- Op: byte(message.AppResponseOp),
- }
- r.network.Handler.RegisterAppRequest(reqID)
- }
- responseChan := r.network.Handler.RegisterRequestID(requestID, vdrSet.Len())
-
- sentTo := r.network.Network.Send(
- outMsg,
- avagoCommon.SendConfig{NodeIDs: vdrSet},
- r.sourceBlockchain.GetSubnetID(),
- subnets.NoOpAllower,
- )
- r.logger.Debug(
- "Sent signature request to network",
- zap.String("warpMessageID", unsignedMessage.ID().String()),
- zap.Any("sentTo", sentTo),
- zap.String("sourceBlockchainID", r.sourceBlockchain.GetBlockchainID().String()),
- zap.String("destinationBlockchainID", r.relayerID.DestinationBlockchainID.String()),
- )
- for nodeID := range vdrSet {
- if !sentTo.Contains(nodeID) {
- r.logger.Warn(
- "Failed to make async request to node",
- zap.String("nodeID", nodeID.String()),
- zap.Error(err),
- )
- responsesExpected--
- }
- }
-
- responseCount := 0
- if responsesExpected > 0 {
- // Handle the responses. For each response, we need to call response.OnFinishedHandling() exactly once.
- // Wrap the loop body in an anonymous function so that we do so on each loop iteration
- for response := range responseChan {
- r.logger.Debug(
- "Processing response from node",
- zap.String("nodeID", response.NodeID().String()),
- zap.String("warpMessageID", unsignedMessage.ID().String()),
- zap.String("sourceBlockchainID", r.sourceBlockchain.GetBlockchainID().String()),
- zap.String("destinationBlockchainID", r.relayerID.DestinationBlockchainID.String()),
- )
- signedMsg, relevant, err := r.handleResponse(
- response,
- sentTo,
- requestID,
- connectedValidators,
- unsignedMessage,
- signatureMap,
- accumulatedSignatureWeight,
- )
- if err != nil {
- return nil, err
- }
- if relevant {
- responseCount++
- }
- // If we have sufficient signatures, return here.
- if signedMsg != nil {
- r.logger.Info(
- "Created signed message.",
- zap.String("warpMessageID", unsignedMessage.ID().String()),
- zap.Uint64("signatureWeight", accumulatedSignatureWeight.Uint64()),
- zap.String("sourceBlockchainID", r.sourceBlockchain.GetBlockchainID().String()),
- zap.String("destinationBlockchainID", r.relayerID.DestinationBlockchainID.String()),
- )
- return signedMsg, nil
- }
- // Break once we've had successful or unsuccessful responses from each requested node
- if responseCount == responsesExpected {
- break
- }
- }
- }
- if attempt != maxRelayerQueryAttempts {
- // Sleep such that all retries are uniformly spread across totalRelayerQueryPeriodMs
- // TODO: We may want to consider an exponential back off rather than a uniform sleep period.
- time.Sleep(time.Duration(signatureRequestRetryWaitPeriodMs/maxRelayerQueryAttempts) * time.Millisecond)
- }
- }
-
- r.logger.Warn(
- "Failed to collect a threshold of signatures",
- zap.Int("attempts", maxRelayerQueryAttempts),
- zap.String("warpMessageID", unsignedMessage.ID().String()),
- zap.Uint64("accumulatedWeight", accumulatedSignatureWeight.Uint64()),
- zap.String("sourceBlockchainID", r.sourceBlockchain.GetBlockchainID().String()),
- zap.String("destinationBlockchainID", r.relayerID.DestinationBlockchainID.String()),
- )
- return nil, errNotEnoughSignatures
-}
-
-// Attempts to create a signed warp message from the accumulated responses.
-// Returns a non-nil Warp message if [accumulatedSignatureWeight] exceeds the signature verification threshold.
-// Returns false in the second return parameter if the app response is not relevant to the current signature
-// aggregation request. Returns an error only if a non-recoverable error occurs, otherwise returns a nil error
-// to continue processing responses.
-func (r *ApplicationRelayer) handleResponse(
- response message.InboundMessage,
- sentTo set.Set[ids.NodeID],
- requestID uint32,
- connectedValidators *peers.ConnectedCanonicalValidators,
- unsignedMessage *avalancheWarp.UnsignedMessage,
- signatureMap map[int]blsSignatureBuf,
- accumulatedSignatureWeight *big.Int,
-) (*avalancheWarp.Message, bool, error) {
- // Regardless of the response's relevance, call it's finished handler once this function returns
- defer response.OnFinishedHandling()
-
- // Check if this is an expected response.
- m := response.Message()
- rcvReqID, ok := message.GetRequestID(m)
- if !ok {
- // This should never occur, since inbound message validity is already checked by the inbound handler
- r.logger.Error("Could not get requestID from message")
- return nil, false, nil
- }
- nodeID := response.NodeID()
- if !sentTo.Contains(nodeID) || rcvReqID != requestID {
- r.logger.Debug("Skipping irrelevant app response")
- return nil, false, nil
- }
-
- // If we receive an AppRequestFailed, then the request timed out.
- // This is still a relevant response, since we are no longer expecting a response from that node.
- if response.Op() == message.AppErrorOp {
- r.logger.Debug("Request timed out")
- return nil, true, nil
- }
-
- validator, vdrIndex := connectedValidators.GetValidator(nodeID)
- signature, valid := r.isValidSignatureResponse(unsignedMessage, response, validator.PublicKey)
- if valid {
- r.logger.Debug(
- "Got valid signature response",
- zap.String("nodeID", nodeID.String()),
- zap.Uint64("stakeWeight", validator.Weight),
- zap.String("warpMessageID", unsignedMessage.ID().String()),
- zap.String("sourceBlockchainID", r.sourceBlockchain.GetBlockchainID().String()),
- zap.String("destinationBlockchainID", r.relayerID.DestinationBlockchainID.String()),
- )
- signatureMap[vdrIndex] = signature
- accumulatedSignatureWeight.Add(accumulatedSignatureWeight, new(big.Int).SetUint64(validator.Weight))
- } else {
- r.logger.Debug(
- "Got invalid signature response",
- zap.String("nodeID", nodeID.String()),
- zap.Uint64("stakeWeight", validator.Weight),
- zap.String("warpMessageID", unsignedMessage.ID().String()),
- zap.String("sourceBlockchainID", r.sourceBlockchain.GetBlockchainID().String()),
- zap.String("destinationBlockchainID", r.relayerID.DestinationBlockchainID.String()),
- )
- return nil, true, nil
- }
-
- // As soon as the signatures exceed the stake weight threshold we try to aggregate and send the transaction.
- if utils.CheckStakeWeightExceedsThreshold(
- accumulatedSignatureWeight,
- connectedValidators.TotalValidatorWeight,
- r.warpQuorum.QuorumNumerator,
- r.warpQuorum.QuorumDenominator,
- ) {
- aggSig, vdrBitSet, err := r.aggregateSignatures(signatureMap)
- if err != nil {
- r.logger.Error(
- "Failed to aggregate signature.",
- zap.String("sourceBlockchainID", r.sourceBlockchain.GetBlockchainID().String()),
- zap.String("destinationBlockchainID", r.relayerID.DestinationBlockchainID.String()),
- zap.String("warpMessageID", unsignedMessage.ID().String()),
- zap.Error(err),
- )
- return nil, true, err
- }
-
- signedMsg, err := avalancheWarp.NewMessage(
- unsignedMessage,
- &avalancheWarp.BitSetSignature{
- Signers: vdrBitSet.Bytes(),
- Signature: *(*[bls.SignatureLen]byte)(bls.SignatureToBytes(aggSig)),
- },
- )
- if err != nil {
- r.logger.Error(
- "Failed to create new signed message",
- zap.String("sourceBlockchainID", r.sourceBlockchain.GetBlockchainID().String()),
- zap.String("destinationBlockchainID", r.relayerID.DestinationBlockchainID.String()),
- zap.String("warpMessageID", unsignedMessage.ID().String()),
- zap.Error(err),
- )
- return nil, true, err
- }
- return signedMsg, true, nil
- }
- // Not enough signatures, continue processing messages
- return nil, true, nil
-}
-
-// isValidSignatureResponse tries to generate a signature from the peer.AsyncResponse, then verifies
-// the signature against the node's public key. If we are unable to generate the signature or verify
-// correctly, false will be returned to indicate no valid signature was found in response.
-func (r *ApplicationRelayer) isValidSignatureResponse(
- unsignedMessage *avalancheWarp.UnsignedMessage,
- response message.InboundMessage,
- pubKey *bls.PublicKey,
-) (blsSignatureBuf, bool) {
- // If the handler returned an error response, count the response and continue
- if response.Op() == message.AppErrorOp {
- r.logger.Debug(
- "Relayer async response failed",
- zap.String("nodeID", response.NodeID().String()),
- )
- return blsSignatureBuf{}, false
- }
-
- appResponse, ok := response.Message().(*p2p.AppResponse)
- if !ok {
- r.logger.Debug(
- "Relayer async response was not an AppResponse",
- zap.String("nodeID", response.NodeID().String()),
- )
- return blsSignatureBuf{}, false
- }
-
- var sigResponse msg.SignatureResponse
- if _, err := msg.Codec.Unmarshal(appResponse.AppBytes, &sigResponse); err != nil {
- r.logger.Error(
- "Error unmarshaling signature response",
- zap.Error(err),
- )
- }
- signature := sigResponse.Signature
-
- // If the node returned an empty signature, then it has not yet seen the warp message. Retry later.
- emptySignature := blsSignatureBuf{}
- if bytes.Equal(signature[:], emptySignature[:]) {
- r.logger.Debug(
- "Response contained an empty signature",
- zap.String("nodeID", response.NodeID().String()),
- zap.String("destinationBlockchainID", r.relayerID.DestinationBlockchainID.String()),
- )
- return blsSignatureBuf{}, false
- }
-
- sig, err := bls.SignatureFromBytes(signature[:])
- if err != nil {
- r.logger.Debug(
- "Failed to create signature from response",
- zap.String("destinationBlockchainID", r.relayerID.DestinationBlockchainID.String()),
- )
- return blsSignatureBuf{}, false
- }
-
- if !bls.Verify(pubKey, sig, unsignedMessage.Bytes()) {
- r.logger.Debug(
- "Failed verification for signature",
- zap.String("destinationBlockchainID", r.relayerID.DestinationBlockchainID.String()),
- )
- return blsSignatureBuf{}, false
- }
-
- return signature, true
-}
-
-// aggregateSignatures constructs a BLS aggregate signature from the collected validator signatures. Also
-// returns a bit set representing the validators that are represented in the aggregate signature. The bit
-// set is in canonical validator order.
-func (r *ApplicationRelayer) aggregateSignatures(
- signatureMap map[int]blsSignatureBuf,
-) (*bls.Signature, set.Bits, error) {
- // Aggregate the signatures
- signatures := make([]*bls.Signature, 0, len(signatureMap))
- vdrBitSet := set.NewBits()
-
- for i, sigBytes := range signatureMap {
- sig, err := bls.SignatureFromBytes(sigBytes[:])
- if err != nil {
- r.logger.Error(
- "Failed to unmarshal signature",
- zap.Error(err),
- )
- return nil, set.Bits{}, err
- }
- signatures = append(signatures, sig)
- vdrBitSet.Add(i)
- }
-
- aggSig, err := bls.AggregateSignatures(signatures)
- if err != nil {
- r.logger.Error(
- "Failed to aggregate signatures",
- zap.Error(err),
- )
- return nil, set.Bits{}, err
- }
- return aggSig, vdrBitSet, nil
-}
-
//
// Metrics
//
diff --git a/resources/catch-up-example.png b/resources/catch-up-example.png
new file mode 100644
index 00000000..3ad3f16d
Binary files /dev/null and b/resources/catch-up-example.png differ
diff --git a/scripts/build.sh b/scripts/build.sh
index d4661991..bcea3cd6 100755
--- a/scripts/build.sh
+++ b/scripts/build.sh
@@ -1,56 +1,8 @@
-#!/usr/bin/env bash
-# Copyright (C) 2023, Ava Labs, Inc. All rights reserved.
-# See the file LICENSE for licensing terms.
-
-set -o errexit
-set -o nounset
-set -o pipefail
-
-go_version() {
- go version | sed -nE -e 's/[^0-9.]+([0-9.]+).+/\1/p'
-}
-
-version_lt() {
- # Return true if $1 is a lower version than than $2,
- local ver1=$1
- local ver2=$2
- # Reverse sort the versions, if the 1st item != ver1 then ver1 < ver2
- if [[ $(echo -e -n "$ver1\n$ver2\n" | sort -rV | head -n1) != "$ver1" ]]; then
- return 0
- else
- return 1
- fi
-}
-
# Root directory
-RELAYER_PATH=$(
+root=$(
cd "$(dirname "${BASH_SOURCE[0]}")"
cd .. && pwd
)
-# Load the versions and constants
-source "$RELAYER_PATH"/scripts/versions.sh
-source "$RELAYER_PATH"/scripts/constants.sh
-
-go_version_minimum=$GO_VERSION
-
-if version_lt "$(go_version)" "$go_version_minimum"; then
- echo "awm-relayer requires Go >= $go_version_minimum, Go $(go_version) found." >&2
- exit 1
-fi
-
-scripts/protobuf_codegen.sh
-
-if [[ $# -eq 1 ]]; then
- binary_path=$1
-elif [[ $# -eq 0 ]]; then
- binary_path="build/awm-relayer"
-else
- echo "Invalid arguments to build awm-relayer. Requires zero (default location) or one argument to specify binary location."
- exit 1
-fi
-
-# Build AWM Relayer, which is run as a standalone process
-last_git_tag=$(git describe --tags --abbrev=0 2>/dev/null) || last_git_tag="v0.0.0-dev"
-echo "Building AWM Relayer Version: $last_git_tag at $binary_path"
-go build -ldflags "-X 'main.version=$last_git_tag'" -o "$binary_path" "main/"*.go
+"$root"/scripts/build_relayer.sh
+"$root"/scripts/build_signature_aggregator.sh
diff --git a/scripts/build_relayer.sh b/scripts/build_relayer.sh
new file mode 100755
index 00000000..8069a874
--- /dev/null
+++ b/scripts/build_relayer.sh
@@ -0,0 +1,56 @@
+#!/usr/bin/env bash
+# Copyright (C) 2023, Ava Labs, Inc. All rights reserved.
+# See the file LICENSE for licensing terms.
+
+set -o errexit
+set -o nounset
+set -o pipefail
+
+go_version() {
+ go version | sed -nE -e 's/[^0-9.]+([0-9.]+).+/\1/p'
+}
+
+version_lt() {
+ # Return true if $1 is a lower version than than $2,
+ local ver1=$1
+ local ver2=$2
+ # Reverse sort the versions, if the 1st item != ver1 then ver1 < ver2
+ if [[ $(echo -e -n "$ver1\n$ver2\n" | sort -rV | head -n1) != "$ver1" ]]; then
+ return 0
+ else
+ return 1
+ fi
+}
+
+# Root directory
+RELAYER_PATH=$(
+ cd "$(dirname "${BASH_SOURCE[0]}")"
+ cd .. && pwd
+)
+
+# Load the versions and constants
+source "$RELAYER_PATH"/scripts/versions.sh
+source "$RELAYER_PATH"/scripts/constants.sh
+
+go_version_minimum=$GO_VERSION
+
+if version_lt "$(go_version)" "$go_version_minimum"; then
+ echo "awm-relayer requires Go >= $go_version_minimum, Go $(go_version) found." >&2
+ exit 1
+fi
+
+scripts/protobuf_codegen.sh
+
+if [[ $# -eq 1 ]]; then
+ binary_path=$1
+elif [[ $# -eq 0 ]]; then
+ binary_path=$relayer_path
+else
+ echo "Invalid arguments to build awm-relayer. Requires zero (default location) or one argument to specify binary location."
+ exit 1
+fi
+
+# Build AWM Relayer, which is run as a standalone process
+last_git_tag=$(git describe --tags --abbrev=0 2>/dev/null) || last_git_tag="v0.0.0-dev"
+echo "Building AWM Relayer Version: $last_git_tag at $binary_path"
+go build -ldflags "-X 'main.version=$last_git_tag'" -o "$binary_path" "main/"*.go
diff --git a/scripts/build_signature_aggregator.sh b/scripts/build_signature_aggregator.sh
new file mode 100755
index 00000000..09e58ae4
--- /dev/null
+++ b/scripts/build_signature_aggregator.sh
@@ -0,0 +1,59 @@
+#!/usr/bin/env bash
+# Copyright (C) 2024, Ava Labs, Inc. All rights reserved.
+# See the file LICENSE for licensing terms.
+
+set -o errexit
+set -o nounset
+set -o pipefail
+
+go_version() {
+ go version | sed -nE -e 's/[^0-9.]+([0-9.]+).+/\1/p'
+}
+
+version_lt() {
+ # Return true if $1 is a lower version than than $2,
+ local ver1=$1
+ local ver2=$2
+ # Reverse sort the versions, if the 1st item != ver1 then ver1 < ver2
+ if [[ $(echo -e -n "$ver1\n$ver2\n" | sort -rV | head -n1) != "$ver1" ]]; then
+ return 0
+ else
+ return 1
+ fi
+}
+
+# Signature Aggregator root directory
+SIGNATURE_AGGREGATOR_PATH=$(
+ cd "$(dirname "${BASH_SOURCE[0]}")"
+ cd ../signature-aggregator && pwd
+)
+
+BASE_PATH=$(
+ cd $SIGNATURE_AGGREGATOR_PATH
+ cd .. && pwd
+)
+
+source $BASE_PATH/scripts/constants.sh
+source $BASE_PATH/scripts/versions.sh
+
+go_version_minimum=$GO_VERSION
+
+if version_lt "$(go_version)" "$go_version_minimum"; then
+ echo "signature-aggregator requires Go >= $go_version_minimum, Go $(go_version) found." >&2
+ exit 1
+fi
+
+if [[ $# -eq 1 ]]; then
+ binary_path=$1
+elif [[ $# -eq 0 ]]; then
+ binary_path="$signature_aggregator_path"
+else
+ echo "Invalid arguments to build signature-aggregator. Requires zero (default location) or one argument to specify binary location."
+ exit 1
+fi
+
+cd $SIGNATURE_AGGREGATOR_PATH
+# Build AWM Relayer, which is run as a standalone process
+last_git_tag=$(git describe --tags --abbrev=0 2>/dev/null) || last_git_tag="v0.0.0-dev"
+echo "Building Signature Aggregator Version: $last_git_tag at $binary_path"
+go build -ldflags "-X 'main.version=$last_git_tag'" -o "$binary_path" "main/"*.go
diff --git a/scripts/constants.sh b/scripts/constants.sh
index adf237a1..ee1a483b 100755
--- a/scripts/constants.sh
+++ b/scripts/constants.sh
@@ -10,8 +10,14 @@ RELAYER_PATH=$(
cd .. && pwd
)
-# Where AWM Relayer binary goes
+SIGNATURE_AGGREGATOR_PATH=$(
+ cd "$(dirname "${BASH_SOURCE[0]}")"
+ cd ../signature-aggregator && pwd
+)
+
+# Where binaries go
relayer_path="$RELAYER_PATH/build/awm-relayer"
+signature_aggregator_path="$SIGNATURE_AGGREGATOR_PATH/build/signature-aggregator"
# Set the PATHS
GOPATH="$(go env GOPATH)"
@@ -36,4 +42,4 @@ git_commit=${RELAYER_COMMIT:-$( git rev-list -1 HEAD )}
export CGO_CFLAGS="-O -D__BLST_PORTABLE__"
# While CGO_ENABLED doesn't need to be explicitly set, it produces a much more
# clear error due to the default value change in go1.20.
-export CGO_ENABLED=1
\ No newline at end of file
+export CGO_ENABLED=1
diff --git a/signature-aggregator/README.md b/signature-aggregator/README.md
new file mode 100644
index 00000000..413a65d6
--- /dev/null
+++ b/signature-aggregator/README.md
@@ -0,0 +1,67 @@
+# Signature Aggregator
+
+This directory contains a lightweight stand-alone API for requesting signatures for a Warp message from Subnet validators.
+It is also used by `awm-relayer` for gathering signatures when configured to use AppRequest instead of the Warp signature RPC client.
+
+## Building
+
+To build the application run `scripts/build_signature_aggregator.sh` which will output the binary to `signature-aggregator/build/signature-aggregator` path by default.
+
+## Running
+
+To run the binary you must supply a config file via `./signature-aggregator --config-file`
+Currently required configurations are a small subset of the [`awm-relayer` configuration](https://github.com/ava-labs/awm-relayer?tab=readme-ov-file#configuration).
+
+Namely:
+- `LogLevel`: string
+- `PChainAPI`: APIConfig
+- `InfoAPI` : APIConfig
+- `APIPort` : (optional) defaults to 8080
+- `MetricsPort`: (optional) defaults to 8081
+
+Sample config that can be used for local testing is `signature-aggregator/sample-signature-aggregator-config.json`
+
+## Interface
+
+The only exposed endpoint is `/aggregate-signatures` expecting `application/json` encoded request with the following body. Note that all the fields are optional but at least one of `message` or `justification` must be non-empty:
+```json
+{
+ "message": "", // (string) hex-encoded unsigned message bytes to be signed
+ "justification": "", // (string) hex-encoded bytes to supply to the validators as justification
+ "signing-subnet-id": "", // (string) hex or cb58 encoded signing subnet ID. Defaults to source blockchain's subnet from data if omitted.
+ "quorum-percentage": 67 // (int) quorum percentage required to sign the message. Defaults to 67 if omitted
+}
+```
+
+## Sample workflow
+If you want to manually test a locally running service pointed to the Fuji testnet you can do so with the following steps.
+
+Note that this might fail for older messages if there has been enough validator churn, and less then the threshold weight of stake of validators have seen the message when it originated. In this case try picking a more recent message.
+
+The basic request consists of sending just the `data` field containing the hex-encoded bytes of the full unsigned Warp message that the validators would be willing to sign. Here are the steps to obtain a sample valid unsigned Warp message bytes from the Fuji network.
+
+1. Find a valid on-chain `Receive Cross Chain Message` Transaction
+ This can be done by looking at the `Teleporter` [contract tracker](https://subnets-test.avax.network/c-chain/address/0x253b2784c75e510dD0fF1da844684a1aC0aa5fcf) on the Fuji-C
+2. Get the transaction receipt and logs for this transaction. This can be done through the explorer or via a curl to the API:
+```bash
+curl --location 'https://api.avax-test.network/ext/bc/C/rpc' \
+--header 'Content-Type: application/json' \
+--data '{
+ "jsonrpc": "2.0",
+ "method": "eth_getTransactionReceipt",
+ "params": [
+ ""
+ ],
+ "id": 1
+}'
+```
+3. Search these logs for the `SendWarpMessage` Log event emitted from the Warp precompile address (`0x0200000000000000000000000000000000000005`)
+ The topic of the message will be `0x56600c567728a800c0aa927500f831cb451df66a7af570eb4df4dfbf4674887d` which is the output of`cast keccak "SendWarpMessage(address,bytes32,bytes)"`
+4. Use the data field of the log message found in step 2 and send it to the locally running service via curl.
+```bash
+curl --location 'http://localhost:8080/aggregate-signatures/by-raw-message' \
+--header 'Content-Type: application/json' \
+--data '{
+ "data": "",
+}'
+```
diff --git a/signature-aggregator/aggregator/aggregator.go b/signature-aggregator/aggregator/aggregator.go
new file mode 100644
index 00000000..f0f4c7d7
--- /dev/null
+++ b/signature-aggregator/aggregator/aggregator.go
@@ -0,0 +1,517 @@
+// Copyright (C) 2024, Ava Labs, Inc. All rights reserved.
+// See the file LICENSE for licensing terms.
+
+package aggregator
+
+import (
+ "bytes"
+ "errors"
+ "fmt"
+ "math/big"
+ "math/rand"
+ "sync"
+ "sync/atomic"
+ "time"
+
+ "github.com/ava-labs/avalanchego/ids"
+ "github.com/ava-labs/avalanchego/message"
+ "github.com/ava-labs/avalanchego/proto/pb/p2p"
+ "github.com/ava-labs/avalanchego/subnets"
+ "github.com/ava-labs/avalanchego/utils/constants"
+ "github.com/ava-labs/avalanchego/utils/crypto/bls"
+ "github.com/ava-labs/avalanchego/utils/logging"
+ "github.com/ava-labs/avalanchego/utils/set"
+ avalancheWarp "github.com/ava-labs/avalanchego/vms/platformvm/warp"
+ "github.com/ava-labs/awm-relayer/peers"
+ "github.com/ava-labs/awm-relayer/signature-aggregator/metrics"
+ "github.com/ava-labs/awm-relayer/utils"
+ coreEthMsg "github.com/ava-labs/coreth/plugin/evm/message"
+ msg "github.com/ava-labs/subnet-evm/plugin/evm/message"
+ "go.uber.org/zap"
+)
+
+type blsSignatureBuf [bls.SignatureLen]byte
+
+const (
+ // Number of retries to collect signatures from validators
+ maxRelayerQueryAttempts = 5
+ // Maximum amount of time to spend waiting (in addition to network round trip time per attempt)
+ // during relayer signature query routine
+ signatureRequestRetryWaitPeriodMs = 10_000
+)
+
+var (
+ codec = msg.Codec
+ coreEthCodec = coreEthMsg.Codec
+
+ // Errors
+ errNotEnoughSignatures = errors.New("failed to collect a threshold of signatures")
+ errNotEnoughConnectedStake = errors.New("failed to connect to a threshold of stake")
+)
+
+type SignatureAggregator struct {
+ network *peers.AppRequestNetwork
+ // protected by subnetsMapLock
+ subnetIDsByBlockchainID map[ids.ID]ids.ID
+ logger logging.Logger
+ messageCreator message.Creator
+ currentRequestID atomic.Uint32
+ subnetsMapLock sync.RWMutex
+ metrics *metrics.SignatureAggregatorMetrics
+}
+
+func NewSignatureAggregator(
+ network *peers.AppRequestNetwork,
+ logger logging.Logger,
+ metrics *metrics.SignatureAggregatorMetrics,
+ messageCreator message.Creator,
+) *SignatureAggregator {
+ sa := SignatureAggregator{
+ network: network,
+ subnetIDsByBlockchainID: map[ids.ID]ids.ID{},
+ logger: logger,
+ metrics: metrics,
+ messageCreator: messageCreator,
+ currentRequestID: atomic.Uint32{},
+ }
+ sa.currentRequestID.Store(rand.Uint32())
+ return &sa
+}
+
+func (s *SignatureAggregator) CreateSignedMessage(
+ unsignedMessage *avalancheWarp.UnsignedMessage,
+ inputSigningSubnet ids.ID,
+ quorumPercentage uint64,
+) (*avalancheWarp.Message, error) {
+ requestID := s.currentRequestID.Add(1)
+
+ var signingSubnet ids.ID
+ var err error
+ // If signingSubnet is not set we default to the subnet of the source blockchain
+ sourceSubnet, err := s.GetSubnetID(unsignedMessage.SourceChainID)
+ if err != nil {
+ return nil, fmt.Errorf(
+ "Source message subnet not found for chainID %s",
+ unsignedMessage.SourceChainID,
+ )
+ }
+ if inputSigningSubnet == ids.Empty {
+ signingSubnet = sourceSubnet
+ } else {
+ signingSubnet = inputSigningSubnet
+ }
+
+ connectedValidators, err := s.network.ConnectToCanonicalValidators(signingSubnet)
+
+ if err != nil {
+ msg := "Failed to connect to canonical validators"
+ s.logger.Error(
+ msg,
+ zap.String("warpMessageID", unsignedMessage.ID().String()),
+ zap.Error(err),
+ )
+ s.metrics.FailuresToGetValidatorSet.Inc()
+ return nil, fmt.Errorf("%s: %w", msg, err)
+ }
+ if !utils.CheckStakeWeightPercentageExceedsThreshold(
+ big.NewInt(0).SetUint64(connectedValidators.ConnectedWeight),
+ connectedValidators.TotalValidatorWeight,
+ quorumPercentage,
+ ) {
+ s.logger.Error(
+ "Failed to connect to a threshold of stake",
+ zap.Uint64("connectedWeight", connectedValidators.ConnectedWeight),
+ zap.Uint64("totalValidatorWeight", connectedValidators.TotalValidatorWeight),
+ zap.Uint64("quorumPercentage", quorumPercentage),
+ )
+ s.metrics.FailuresToConnectToSufficientStake.Inc()
+ return nil, errNotEnoughConnectedStake
+ }
+
+ // TODO: remove this special handling and replace with ACP-118 interface once available
+ var reqBytes []byte
+ if sourceSubnet == constants.PrimaryNetworkID {
+ req := coreEthMsg.MessageSignatureRequest{
+ MessageID: unsignedMessage.ID(),
+ }
+ reqBytes, err = coreEthMsg.RequestToBytes(coreEthCodec, req)
+ } else {
+ req := msg.MessageSignatureRequest{
+ MessageID: unsignedMessage.ID(),
+ }
+ reqBytes, err = msg.RequestToBytes(codec, req)
+ }
+ if err != nil {
+ msg := "Failed to marshal request bytes"
+ s.logger.Error(
+ msg,
+ zap.String("warpMessageID", unsignedMessage.ID().String()),
+ zap.Error(err),
+ )
+ return nil, fmt.Errorf("%s: %w", msg, err)
+ }
+
+ // Construct the AppRequest
+ outMsg, err := s.messageCreator.AppRequest(
+ unsignedMessage.SourceChainID,
+ requestID,
+ peers.DefaultAppRequestTimeout,
+ reqBytes,
+ )
+ if err != nil {
+ msg := "Failed to create app request message"
+ s.logger.Error(
+ msg,
+ zap.String("warpMessageID", unsignedMessage.ID().String()),
+ zap.Error(err),
+ )
+ return nil, fmt.Errorf("%s: %w", msg, err)
+ }
+
+ // Query the validators with retries. On each retry, query one node per unique BLS pubkey
+ accumulatedSignatureWeight := big.NewInt(0)
+
+ signatureMap := make(map[int]blsSignatureBuf)
+ for attempt := 1; attempt <= maxRelayerQueryAttempts; attempt++ {
+ responsesExpected := len(connectedValidators.ValidatorSet) - len(signatureMap)
+ s.logger.Debug(
+ "Aggregator collecting signatures from peers.",
+ zap.Int("attempt", attempt),
+ zap.String("sourceBlockchainID", unsignedMessage.SourceChainID.String()),
+ zap.String("signingSubnetID", signingSubnet.String()),
+ zap.Int("validatorSetSize", len(connectedValidators.ValidatorSet)),
+ zap.Int("signatureMapSize", len(signatureMap)),
+ zap.Int("responsesExpected", responsesExpected),
+ )
+
+ vdrSet := set.NewSet[ids.NodeID](len(connectedValidators.ValidatorSet))
+ for i, vdr := range connectedValidators.ValidatorSet {
+ // If we already have the signature for this validator, do not query any of the composite nodes again
+ if _, ok := signatureMap[i]; ok {
+ continue
+ }
+
+ // TODO: Track failures and iterate through the validator's node list on subsequent query attempts
+ nodeID := vdr.NodeIDs[0]
+ vdrSet.Add(nodeID)
+ s.logger.Debug(
+ "Added node ID to query.",
+ zap.String("nodeID", nodeID.String()),
+ zap.String("warpMessageID", unsignedMessage.ID().String()),
+ zap.String("sourceBlockchainID", unsignedMessage.SourceChainID.String()),
+ )
+
+ // Register a timeout response for each queried node
+ reqID := ids.RequestID{
+ NodeID: nodeID,
+ SourceChainID: unsignedMessage.SourceChainID,
+ DestinationChainID: unsignedMessage.SourceChainID,
+ RequestID: requestID,
+ Op: byte(message.AppResponseOp),
+ }
+ s.network.RegisterAppRequest(reqID)
+ }
+ responseChan := s.network.RegisterRequestID(requestID, vdrSet.Len())
+
+ sentTo := s.network.Send(outMsg, vdrSet, sourceSubnet, subnets.NoOpAllower)
+ s.logger.Debug(
+ "Sent signature request to network",
+ zap.String("warpMessageID", unsignedMessage.ID().String()),
+ zap.Any("sentTo", sentTo),
+ zap.String("sourceBlockchainID", unsignedMessage.SourceChainID.String()),
+ zap.String("sourceSubnetID", sourceSubnet.String()),
+ zap.String("signingSubnetID", signingSubnet.String()),
+ )
+ for nodeID := range vdrSet {
+ if !sentTo.Contains(nodeID) {
+ s.logger.Warn(
+ "Failed to make async request to node",
+ zap.String("nodeID", nodeID.String()),
+ zap.Error(err),
+ )
+ responsesExpected--
+ s.metrics.FailuresSendingToNode.Inc()
+ }
+ }
+
+ responseCount := 0
+ if responsesExpected > 0 {
+ for response := range responseChan {
+ s.logger.Debug(
+ "Processing response from node",
+ zap.String("nodeID", response.NodeID().String()),
+ zap.String("warpMessageID", unsignedMessage.ID().String()),
+ zap.String("sourceBlockchainID", unsignedMessage.SourceChainID.String()),
+ )
+ signedMsg, relevant, err := s.handleResponse(
+ response,
+ sentTo,
+ requestID,
+ connectedValidators,
+ unsignedMessage,
+ signatureMap,
+ accumulatedSignatureWeight,
+ quorumPercentage,
+ )
+ if err != nil {
+ // don't increase node failures metric here, because we did
+ // it in handleResponse
+ return nil, fmt.Errorf(
+ "failed to handle response: %w",
+ err,
+ )
+ }
+ if relevant {
+ responseCount++
+ }
+ // If we have sufficient signatures, return here.
+ if signedMsg != nil {
+ s.logger.Info(
+ "Created signed message.",
+ zap.String("warpMessageID", unsignedMessage.ID().String()),
+ zap.Uint64("signatureWeight", accumulatedSignatureWeight.Uint64()),
+ zap.String("sourceBlockchainID", unsignedMessage.SourceChainID.String()),
+ )
+ return signedMsg, nil
+ }
+ // Break once we've had successful or unsuccessful responses from each requested node
+ if responseCount == responsesExpected {
+ break
+ }
+ }
+ }
+ if attempt != maxRelayerQueryAttempts {
+ // Sleep such that all retries are uniformly spread across totalRelayerQueryPeriodMs
+ // TODO: We may want to consider an exponential back off rather than a uniform sleep period.
+ time.Sleep(time.Duration(signatureRequestRetryWaitPeriodMs/maxRelayerQueryAttempts) * time.Millisecond)
+ }
+ }
+ s.logger.Warn(
+ "Failed to collect a threshold of signatures",
+ zap.Int("attempts", maxRelayerQueryAttempts),
+ zap.String("warpMessageID", unsignedMessage.ID().String()),
+ zap.Uint64("accumulatedWeight", accumulatedSignatureWeight.Uint64()),
+ zap.String("sourceBlockchainID", unsignedMessage.SourceChainID.String()),
+ )
+ return nil, errNotEnoughSignatures
+}
+
+func (s *SignatureAggregator) GetSubnetID(blockchainID ids.ID) (ids.ID, error) {
+ s.subnetsMapLock.RLock()
+ subnetID, ok := s.subnetIDsByBlockchainID[blockchainID]
+ s.subnetsMapLock.RUnlock()
+ if ok {
+ return subnetID, nil
+ }
+ s.logger.Info("Signing subnet not found, requesting from PChain", zap.String("blockchainID", blockchainID.String()))
+ subnetID, err := s.network.GetSubnetID(blockchainID)
+ if err != nil {
+ return ids.ID{}, fmt.Errorf("source blockchain not found for chain ID %s", blockchainID)
+ }
+ s.SetSubnetID(blockchainID, subnetID)
+ return subnetID, nil
+}
+
+func (s *SignatureAggregator) SetSubnetID(blockchainID ids.ID, subnetID ids.ID) {
+ s.subnetsMapLock.Lock()
+ s.subnetIDsByBlockchainID[blockchainID] = subnetID
+ s.subnetsMapLock.Unlock()
+}
+
+// Attempts to create a signed Warp message from the accumulated responses.
+// Returns a non-nil Warp message if [accumulatedSignatureWeight] exceeds the signature verification threshold.
+// Returns false in the second return parameter if the app response is not relevant to the current signature
+// aggregation request. Returns an error only if a non-recoverable error occurs, otherwise returns a nil error
+// to continue processing responses.
+func (s *SignatureAggregator) handleResponse(
+ response message.InboundMessage,
+ sentTo set.Set[ids.NodeID],
+ requestID uint32,
+ connectedValidators *peers.ConnectedCanonicalValidators,
+ unsignedMessage *avalancheWarp.UnsignedMessage,
+ signatureMap map[int]blsSignatureBuf,
+ accumulatedSignatureWeight *big.Int,
+ quorumPercentage uint64,
+) (*avalancheWarp.Message, bool, error) {
+ // Regardless of the response's relevance, call it's finished handler once this function returns
+ defer response.OnFinishedHandling()
+
+ // Check if this is an expected response.
+ m := response.Message()
+ rcvReqID, ok := message.GetRequestID(m)
+ if !ok {
+ // This should never occur, since inbound message validity is already checked by the inbound handler
+ s.logger.Error("Could not get requestID from message")
+ return nil, false, nil
+ }
+ nodeID := response.NodeID()
+ if !sentTo.Contains(nodeID) || rcvReqID != requestID {
+ s.logger.Debug("Skipping irrelevant app response")
+ return nil, false, nil
+ }
+
+ // If we receive an AppRequestFailed, then the request timed out.
+ // This is still a relevant response, since we are no longer expecting a response from that node.
+ if response.Op() == message.AppErrorOp {
+ s.logger.Debug("Request timed out")
+ s.metrics.ValidatorTimeouts.Inc()
+ return nil, true, nil
+ }
+
+ validator, vdrIndex := connectedValidators.GetValidator(nodeID)
+ signature, valid := s.isValidSignatureResponse(unsignedMessage, response, validator.PublicKey)
+ if valid {
+ s.logger.Debug(
+ "Got valid signature response",
+ zap.String("nodeID", nodeID.String()),
+ zap.Uint64("stakeWeight", validator.Weight),
+ zap.String("warpMessageID", unsignedMessage.ID().String()),
+ zap.String("sourceBlockchainID", unsignedMessage.SourceChainID.String()),
+ )
+ signatureMap[vdrIndex] = signature
+ accumulatedSignatureWeight.Add(accumulatedSignatureWeight, new(big.Int).SetUint64(validator.Weight))
+ } else {
+ s.logger.Debug(
+ "Got invalid signature response",
+ zap.String("nodeID", nodeID.String()),
+ zap.Uint64("stakeWeight", validator.Weight),
+ zap.String("warpMessageID", unsignedMessage.ID().String()),
+ zap.String("sourceBlockchainID", unsignedMessage.SourceChainID.String()),
+ )
+ s.metrics.InvalidSignatureResponses.Inc()
+ return nil, true, nil
+ }
+
+ // As soon as the signatures exceed the stake weight threshold we try to aggregate and send the transaction.
+ if !utils.CheckStakeWeightPercentageExceedsThreshold(
+ accumulatedSignatureWeight,
+ connectedValidators.TotalValidatorWeight,
+ quorumPercentage,
+ ) {
+ // Not enough signatures, continue processing messages
+ return nil, true, nil
+ }
+ aggSig, vdrBitSet, err := s.aggregateSignatures(signatureMap)
+ if err != nil {
+ msg := "Failed to aggregate signature."
+ s.logger.Error(
+ msg,
+ zap.String("sourceBlockchainID", unsignedMessage.SourceChainID.String()),
+ zap.String("warpMessageID", unsignedMessage.ID().String()),
+ zap.Error(err),
+ )
+ return nil, true, fmt.Errorf("%s: %w", msg, err)
+ }
+
+ signedMsg, err := avalancheWarp.NewMessage(
+ unsignedMessage,
+ &avalancheWarp.BitSetSignature{
+ Signers: vdrBitSet.Bytes(),
+ Signature: *(*[bls.SignatureLen]byte)(bls.SignatureToBytes(aggSig)),
+ },
+ )
+ if err != nil {
+ msg := "Failed to create new signed message"
+ s.logger.Error(
+ msg,
+ zap.String("sourceBlockchainID", unsignedMessage.SourceChainID.String()),
+ zap.String("warpMessageID", unsignedMessage.ID().String()),
+ zap.Error(err),
+ )
+ return nil, true, fmt.Errorf("%s: %w", msg, err)
+ }
+ return signedMsg, true, nil
+}
+
+// isValidSignatureResponse tries to generate a signature from the peer.AsyncResponse, then verifies
+// the signature against the node's public key. If we are unable to generate the signature or verify
+// correctly, false will be returned to indicate no valid signature was found in response.
+func (s *SignatureAggregator) isValidSignatureResponse(
+ unsignedMessage *avalancheWarp.UnsignedMessage,
+ response message.InboundMessage,
+ pubKey *bls.PublicKey,
+) (blsSignatureBuf, bool) {
+ // If the handler returned an error response, count the response and continue
+ if response.Op() == message.AppErrorOp {
+ s.logger.Debug(
+ "Relayer async response failed",
+ zap.String("nodeID", response.NodeID().String()),
+ )
+ return blsSignatureBuf{}, false
+ }
+
+ appResponse, ok := response.Message().(*p2p.AppResponse)
+ if !ok {
+ s.logger.Debug(
+ "Relayer async response was not an AppResponse",
+ zap.String("nodeID", response.NodeID().String()),
+ )
+ return blsSignatureBuf{}, false
+ }
+
+ var sigResponse msg.SignatureResponse
+ if _, err := msg.Codec.Unmarshal(appResponse.AppBytes, &sigResponse); err != nil {
+ s.logger.Error(
+ "Error unmarshaling signature response",
+ zap.Error(err),
+ )
+ }
+ signature := sigResponse.Signature
+
+ // If the node returned an empty signature, then it has not yet seen the warp message. Retry later.
+ emptySignature := blsSignatureBuf{}
+ if bytes.Equal(signature[:], emptySignature[:]) {
+ s.logger.Debug(
+ "Response contained an empty signature",
+ zap.String("nodeID", response.NodeID().String()),
+ )
+ return blsSignatureBuf{}, false
+ }
+
+ sig, err := bls.SignatureFromBytes(signature[:])
+ if err != nil {
+ s.logger.Debug(
+ "Failed to create signature from response",
+ )
+ return blsSignatureBuf{}, false
+ }
+
+ if !bls.Verify(pubKey, sig, unsignedMessage.Bytes()) {
+ s.logger.Debug(
+ "Failed verification for signature",
+ )
+ return blsSignatureBuf{}, false
+ }
+
+ return signature, true
+}
+
+// aggregateSignatures constructs a BLS aggregate signature from the collected validator signatures. Also
+// returns a bit set representing the validators that are represented in the aggregate signature. The bit
+// set is in canonical validator order.
+func (s *SignatureAggregator) aggregateSignatures(
+ signatureMap map[int]blsSignatureBuf,
+) (*bls.Signature, set.Bits, error) {
+ // Aggregate the signatures
+ signatures := make([]*bls.Signature, 0, len(signatureMap))
+ vdrBitSet := set.NewBits()
+
+ for i, sigBytes := range signatureMap {
+ sig, err := bls.SignatureFromBytes(sigBytes[:])
+ if err != nil {
+ msg := "Failed to unmarshal signature"
+ s.logger.Error(msg, zap.Error(err))
+ return nil, set.Bits{}, fmt.Errorf("%s: %w", msg, err)
+ }
+ signatures = append(signatures, sig)
+ vdrBitSet.Add(i)
+ }
+
+ aggSig, err := bls.AggregateSignatures(signatures)
+ if err != nil {
+ msg := "Failed to aggregate signatures"
+ s.logger.Error(msg, zap.Error(err))
+ return nil, set.Bits{}, fmt.Errorf("%s: %w", msg, err)
+ }
+ return aggSig, vdrBitSet, nil
+}
diff --git a/signature-aggregator/api/api.go b/signature-aggregator/api/api.go
new file mode 100644
index 00000000..2687e636
--- /dev/null
+++ b/signature-aggregator/api/api.go
@@ -0,0 +1,174 @@
+// Copyright (C) 2024, Ava Labs, Inc. All rights reserved.
+// See the file LICENSE for licensing terms.
+
+package api
+
+import (
+ "encoding/hex"
+ "encoding/json"
+ "net/http"
+ "strings"
+ "time"
+
+ "github.com/ava-labs/avalanchego/ids"
+ "github.com/ava-labs/avalanchego/utils/logging"
+ "github.com/ava-labs/awm-relayer/signature-aggregator/aggregator"
+ "github.com/ava-labs/awm-relayer/signature-aggregator/metrics"
+ "github.com/ava-labs/awm-relayer/types"
+ "github.com/ava-labs/awm-relayer/utils"
+ "go.uber.org/zap"
+)
+
+const (
+ APIPath = "/aggregate-signatures"
+ DefaultQuorumPercentage = 67
+)
+
+// Defines a request interface for signature aggregation for a raw unsigned message.
+type AggregateSignatureRequest struct {
+ // Required. hex-encoded message, optionally prefixed with "0x".
+ UnsignedMessage string `json:"unsigned-message"`
+ // Optional hex or cb58 encoded signing subnet ID. If omitted will default to the subnetID of the source blockchain
+ SigningSubnetID string `json:"signing-subnet-id"`
+ // Optional. Integer from 0 to 100 representing the percentage of the quorum that is required to sign the message
+ // defaults to 67 if omitted.
+ QuorumPercentage uint64 `json:"quorum-percentage"`
+}
+
+type AggregateSignatureResponse struct {
+ // hex encoding of the signature
+ SignedMessage string `json:"signed-message"`
+}
+
+func HandleAggregateSignaturesByRawMsgRequest(
+ logger logging.Logger,
+ metrics *metrics.SignatureAggregatorMetrics,
+ signatureAggregator *aggregator.SignatureAggregator,
+) {
+ http.Handle(
+ APIPath,
+ signatureAggregationAPIHandler(
+ logger,
+ metrics,
+ signatureAggregator,
+ ),
+ )
+}
+
+func writeJSONError(
+ logger logging.Logger,
+ w http.ResponseWriter,
+ errorMsg string,
+) {
+ resp, err := json.Marshal(struct{ error string }{error: errorMsg})
+ if err != nil {
+ msg := "Error marshalling JSON error response"
+ logger.Error(msg, zap.Error(err))
+ resp = []byte(msg)
+ }
+
+ w.Header().Set("Content-Type", "application/json")
+
+ w.Write(resp)
+ if err != nil {
+ logger.Error("Error writing error response", zap.Error(err))
+ }
+}
+
+func signatureAggregationAPIHandler(
+ logger logging.Logger,
+ metrics *metrics.SignatureAggregatorMetrics,
+ aggregator *aggregator.SignatureAggregator,
+) http.Handler {
+ return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
+ metrics.AggregateSignaturesRequestCount.Inc()
+ startTime := time.Now()
+
+ var req AggregateSignatureRequest
+ err := json.NewDecoder(r.Body).Decode(&req)
+ if err != nil {
+ msg := "Could not decode request body"
+ logger.Warn(msg, zap.Error(err))
+ writeJSONError(logger, w, msg)
+ return
+ }
+ var decodedMessage []byte
+ decodedMessage, err = hex.DecodeString(
+ strings.TrimPrefix(req.UnsignedMessage, "0x"),
+ )
+ if err != nil {
+ msg := "Could not decode message"
+ logger.Warn(
+ msg,
+ zap.String("msg", req.UnsignedMessage),
+ zap.Error(err),
+ )
+ writeJSONError(logger, w, msg)
+ return
+ }
+ unsignedMessage, err := types.UnpackWarpMessage(decodedMessage)
+ if err != nil {
+ msg := "Error unpacking warp message"
+ logger.Warn(msg, zap.Error(err))
+ writeJSONError(logger, w, msg)
+ return
+ }
+ quorumPercentage := req.QuorumPercentage
+ if quorumPercentage == 0 {
+ quorumPercentage = DefaultQuorumPercentage
+ } else if req.QuorumPercentage > 100 {
+ msg := "Invalid quorum number"
+ logger.Warn(msg, zap.Uint64("quorum-num", req.QuorumPercentage))
+ writeJSONError(logger, w, msg)
+ return
+ }
+ var signingSubnetID ids.ID
+ if req.SigningSubnetID != "" {
+ signingSubnetID, err = utils.HexOrCB58ToID(
+ req.SigningSubnetID,
+ )
+ if err != nil {
+ msg := "Error parsing signing subnet ID"
+ logger.Warn(
+ msg,
+ zap.Error(err),
+ zap.String("input", req.SigningSubnetID),
+ )
+ writeJSONError(logger, w, msg)
+ }
+ }
+
+ signedMessage, err := aggregator.CreateSignedMessage(
+ unsignedMessage,
+ signingSubnetID,
+ quorumPercentage,
+ )
+ if err != nil {
+ msg := "Failed to aggregate signatures"
+ logger.Warn(msg, zap.Error(err))
+ writeJSONError(logger, w, msg)
+ }
+ resp, err := json.Marshal(
+ AggregateSignatureResponse{
+ SignedMessage: hex.EncodeToString(
+ signedMessage.Bytes(),
+ ),
+ },
+ )
+
+ if err != nil {
+ msg := "Failed to marshal response"
+ logger.Error(msg, zap.Error(err))
+ writeJSONError(logger, w, msg)
+ return
+ }
+ w.Header().Set("Content-Type", "application/json")
+ _, err = w.Write(resp)
+ if err != nil {
+ logger.Error("Error writing response", zap.Error(err))
+ }
+ metrics.AggregateSignaturesLatencyMS.Set(
+ float64(time.Since(startTime).Milliseconds()),
+ )
+ })
+}
diff --git a/signature-aggregator/config/config.go b/signature-aggregator/config/config.go
new file mode 100644
index 00000000..868994e7
--- /dev/null
+++ b/signature-aggregator/config/config.go
@@ -0,0 +1,62 @@
+// Copyright (C) 2024, Ava Labs, Inc. All rights reserved.
+// See the file LICENSE for licensing terms.
+
+package config
+
+import (
+ "fmt"
+
+ "github.com/ava-labs/avalanchego/utils/logging"
+ baseCfg "github.com/ava-labs/awm-relayer/config"
+)
+
+const (
+ defaultAPIPort = uint16(8080)
+ defaultMetricsPort = uint16(8081)
+)
+
+var defaultLogLevel = logging.Info.String()
+
+const usageText = `
+Usage:
+signature-aggregator --config-file path-to-config Specifies the config file and start the signing service.
+signature-aggregator --version Display signature-aggregator version and exit.
+signature-aggregator --help Display signature-aggregator usage and exit.
+`
+
+type Config struct {
+ LogLevel string `mapstructure:"log-level" json:"log-level"`
+ PChainAPI *baseCfg.APIConfig `mapstructure:"p-chain-api" json:"p-chain-api"`
+ InfoAPI *baseCfg.APIConfig `mapstructure:"info-api" json:"info-api"`
+ APIPort uint16 `mapstructure:"api-port" json:"api-port"`
+
+ MetricsPort uint16 `mapstructure:"metrics-port" json:"metrics-port"`
+}
+
+func DisplayUsageText() {
+ fmt.Printf("%s\n", usageText)
+}
+
+// Validates the configuration
+// Does not modify the public fields as derived from the configuration passed to the application,
+// but does initialize private fields available through getters.
+func (c *Config) Validate() error {
+ if err := c.PChainAPI.Validate(); err != nil {
+ return err
+ }
+ if err := c.InfoAPI.Validate(); err != nil {
+ return err
+ }
+
+ return nil
+}
+
+// Config implements the peers.Config interface
+func (c *Config) GetPChainAPI() *baseCfg.APIConfig {
+ return c.PChainAPI
+}
+
+// Config implements the peers.Config interface
+func (c *Config) GetInfoAPI() *baseCfg.APIConfig {
+ return c.InfoAPI
+}
diff --git a/signature-aggregator/config/flags.go b/signature-aggregator/config/flags.go
new file mode 100644
index 00000000..c8a30834
--- /dev/null
+++ b/signature-aggregator/config/flags.go
@@ -0,0 +1,14 @@
+// Copyright (C) 2024, Ava Labs, Inc. All rights reserved.
+// See the file LICENSE for licensing terms.
+
+package config
+
+import "github.com/spf13/pflag"
+
+func BuildFlagSet() *pflag.FlagSet {
+ fs := pflag.NewFlagSet("signature-aggregator", pflag.ContinueOnError)
+ fs.String(ConfigFileKey, "", "Specifies the signature-aggregator config file")
+ fs.BoolP(VersionKey, "", false, "Display signature-aggregator version")
+ fs.BoolP(HelpKey, "", false, "Display signature-aggregator usage")
+ return fs
+}
diff --git a/signature-aggregator/config/keys.go b/signature-aggregator/config/keys.go
new file mode 100644
index 00000000..4753f1a3
--- /dev/null
+++ b/signature-aggregator/config/keys.go
@@ -0,0 +1,18 @@
+// Copyright (C) 2024, Ava Labs, Inc. All rights reserved.
+// See the file LICENSE for licensing terms.
+
+package config
+
+const (
+ // Command line option keys
+ ConfigFileKey = "config-file"
+ VersionKey = "version"
+ HelpKey = "help"
+
+ // Top-level configuration keys
+ LogLevelKey = "log-level"
+ PChainAPIKey = "p-chain-api"
+ InfoAPIKey = "info-api"
+ APIPortKey = "api-port"
+ MetricsPortKey = "metrics-port"
+)
diff --git a/signature-aggregator/config/viper.go b/signature-aggregator/config/viper.go
new file mode 100644
index 00000000..c88fd68d
--- /dev/null
+++ b/signature-aggregator/config/viper.go
@@ -0,0 +1,76 @@
+// Copyright (C) 2024, Ava Labs, Inc. All rights reserved.
+// See the file LICENSE for licensing terms.
+
+package config
+
+import (
+ "fmt"
+ "strings"
+
+ "github.com/spf13/pflag"
+ "github.com/spf13/viper"
+)
+
+func NewConfig(v *viper.Viper) (Config, error) {
+ cfg, err := BuildConfig(v)
+ if err != nil {
+ return cfg, err
+ }
+ if err = cfg.Validate(); err != nil {
+ return Config{}, fmt.Errorf("failed to validate configuration: %w", err)
+ }
+ return cfg, nil
+}
+
+// Build the viper instance. The config file must be provided via the command line flag or environment variable.
+// All config keys may be provided via config file or environment variable.
+func BuildViper(fs *pflag.FlagSet) (*viper.Viper, error) {
+ v := viper.New()
+ v.AutomaticEnv()
+ // Map flag names to env var names. Flags are capitalized, and hyphens are replaced with underscores.
+ v.SetEnvKeyReplacer(strings.NewReplacer("-", "_"))
+ if err := v.BindPFlags(fs); err != nil {
+ return nil, err
+ }
+
+ // Verify required flags are set
+ if !v.IsSet(ConfigFileKey) {
+ DisplayUsageText()
+ return nil, fmt.Errorf("config file not set")
+ }
+
+ filename := v.GetString(ConfigFileKey)
+ v.SetConfigFile(filename)
+ v.SetConfigType("json")
+ if err := v.ReadInConfig(); err != nil {
+ return nil, err
+ }
+
+ return v, nil
+}
+
+func SetDefaultConfigValues(v *viper.Viper) {
+ v.SetDefault(LogLevelKey, defaultLogLevel)
+ v.SetDefault(APIPortKey, defaultAPIPort)
+ v.SetDefault(MetricsPortKey, defaultMetricsPort)
+}
+
+// BuildConfig constructs the signature aggregator config using Viper.
+// The following precedence order is used. Each item takes precedence over the item below it:
+// 1. Flags
+// 2. Config file
+//
+// Returns the Config
+func BuildConfig(v *viper.Viper) (Config, error) {
+ // Set default values
+ SetDefaultConfigValues(v)
+
+ // Build the config from Viper
+ var cfg Config
+
+ if err := v.Unmarshal(&cfg); err != nil {
+ return cfg, fmt.Errorf("failed to unmarshal viper config: %w", err)
+ }
+
+ return cfg, nil
+}
diff --git a/signature-aggregator/main/main.go b/signature-aggregator/main/main.go
new file mode 100644
index 00000000..61c8eccc
--- /dev/null
+++ b/signature-aggregator/main/main.go
@@ -0,0 +1,133 @@
+// Copyright (C) 2024, Ava Labs, Inc. All rights reserved.
+// See the file LICENSE for licensing terms.
+
+package main
+
+import (
+ "errors"
+ "fmt"
+ "log"
+ "net/http"
+ "os"
+
+ "github.com/ava-labs/avalanchego/message"
+ "github.com/ava-labs/avalanchego/utils/constants"
+ "github.com/ava-labs/avalanchego/utils/logging"
+ "github.com/ava-labs/awm-relayer/peers"
+ "github.com/ava-labs/awm-relayer/signature-aggregator/aggregator"
+ "github.com/ava-labs/awm-relayer/signature-aggregator/api"
+ "github.com/ava-labs/awm-relayer/signature-aggregator/config"
+ "github.com/ava-labs/awm-relayer/signature-aggregator/metrics"
+ "github.com/prometheus/client_golang/prometheus"
+ "go.uber.org/zap"
+)
+
+var version = "v0.0.0-dev"
+
+func main() {
+ fs := config.BuildFlagSet()
+ if err := fs.Parse(os.Args[1:]); err != nil {
+ config.DisplayUsageText()
+ panic(fmt.Errorf("Failed to parse flags: %w", err))
+ }
+
+ displayVersion, err := fs.GetBool(config.VersionKey)
+ if err != nil {
+ panic(fmt.Errorf("error reading %s flag: %w", config.VersionKey, err))
+ }
+ if displayVersion {
+ fmt.Printf("%s\n", version)
+ os.Exit(0)
+ }
+
+ help, err := fs.GetBool(config.HelpKey)
+ if err != nil {
+ panic(fmt.Errorf("error reading %s flag value: %w", config.HelpKey, err))
+ }
+ if help {
+ config.DisplayUsageText()
+ os.Exit(0)
+ }
+ v, err := config.BuildViper(fs)
+ if err != nil {
+ panic(fmt.Errorf("couldn't configure flags: %w", err))
+ }
+
+ cfg, err := config.NewConfig(v)
+ if err != nil {
+ panic(fmt.Errorf("couldn't build config: %w", err))
+ }
+
+ logLevel, err := logging.ToLevel(cfg.LogLevel)
+ if err != nil {
+ panic(fmt.Errorf("error with log level: %w", err))
+ }
+
+ logger := logging.NewLogger(
+ "signature-aggregator",
+ logging.NewWrappedCore(
+ logLevel,
+ os.Stdout,
+ logging.JSON.ConsoleEncoder(),
+ ),
+ )
+
+ logger.Info("Initializing signature-aggregator")
+
+ // Initialize the global app request network
+ logger.Info("Initializing app request network")
+ // The app request network generates P2P networking logs that are verbose at the info level.
+ // Unless the log level is debug or lower, set the network log level to error to avoid spamming the logs.
+ // We do not collect metrics for the network.
+ networkLogLevel := logging.Error
+ if logLevel <= logging.Debug {
+ networkLogLevel = logLevel
+ }
+ network, err := peers.NewNetwork(
+ networkLogLevel,
+ prometheus.DefaultRegisterer,
+ nil,
+ &cfg,
+ )
+ if err != nil {
+ logger.Fatal("Failed to create app request network", zap.Error(err))
+ panic(err)
+ }
+
+ // Initialize message creator passed down to relayers for creating app requests.
+ // We do not collect metrics for the message creator.
+ messageCreator, err := message.NewCreator(
+ logger,
+ prometheus.DefaultRegisterer,
+ constants.DefaultNetworkCompressionType,
+ constants.DefaultNetworkMaximumInboundTimeout,
+ )
+ if err != nil {
+ logger.Fatal("Failed to create message creator", zap.Error(err))
+ panic(err)
+ }
+
+ registry := metrics.Initialize(cfg.MetricsPort)
+ metricsInstance := metrics.NewSignatureAggregatorMetrics(registry)
+
+ signatureAggregator := aggregator.NewSignatureAggregator(
+ network,
+ logger,
+ metricsInstance,
+ messageCreator,
+ )
+
+ api.HandleAggregateSignaturesByRawMsgRequest(
+ logger,
+ metricsInstance,
+ signatureAggregator,
+ )
+
+ err = http.ListenAndServe(fmt.Sprintf(":%d", cfg.APIPort), nil)
+ if errors.Is(err, http.ErrServerClosed) {
+ logger.Info("server closed")
+ } else if err != nil {
+ logger.Error("server error", zap.Error(err))
+ log.Fatal(err)
+ }
+}
diff --git a/signature-aggregator/metrics/metrics.go b/signature-aggregator/metrics/metrics.go
new file mode 100644
index 00000000..0cc3eefa
--- /dev/null
+++ b/signature-aggregator/metrics/metrics.go
@@ -0,0 +1,150 @@
+// Copyright (C) 2023, Ava Labs, Inc. All rights reserved.
+// See the file LICENSE for licensing terms.
+
+package metrics
+
+import (
+ "errors"
+ "fmt"
+ "log"
+ "net/http"
+
+ "github.com/ava-labs/avalanchego/api/metrics"
+ "github.com/prometheus/client_golang/prometheus"
+ "github.com/prometheus/client_golang/prometheus/promhttp"
+)
+
+var (
+ ErrFailedToCreateSignatureAggregatorMetrics = errors.New(
+ "failed to create signature aggregator metrics",
+ )
+)
+
+var Opts = struct {
+ AggregateSignaturesLatencyMS prometheus.GaugeOpts
+ AggregateSignaturesRequestCount prometheus.CounterOpts
+ FailuresToGetValidatorSet prometheus.CounterOpts
+ FailuresToConnectToSufficientStake prometheus.CounterOpts
+ FailuresSendingToNode prometheus.CounterOpts
+ ValidatorTimeouts prometheus.CounterOpts
+ InvalidSignatureResponses prometheus.CounterOpts
+}{
+ AggregateSignaturesLatencyMS: prometheus.GaugeOpts{
+ Name: "agg_sigs_latency_ms",
+ Help: "Latency of requests for aggregate signatures",
+ },
+ AggregateSignaturesRequestCount: prometheus.CounterOpts{
+ Name: "agg_sigs_req_count",
+ Help: "Number of requests for aggregate signatures",
+ },
+ FailuresToGetValidatorSet: prometheus.CounterOpts{
+ Name: "failures_to_get_validator_set",
+ Help: "Number of failed attempts to retrieve the validator set",
+ },
+ FailuresToConnectToSufficientStake: prometheus.CounterOpts{
+ Name: "failures_to_connect_to_sufficient_stake",
+ Help: "Number of incidents of connecting to some validators but not enough stake weight",
+ },
+ FailuresSendingToNode: prometheus.CounterOpts{
+ Name: "failures_sending_to_node",
+ Help: "Number of failures to send a request to a validator node",
+ },
+ ValidatorTimeouts: prometheus.CounterOpts{
+ Name: "validator_timeouts",
+ Help: "Number of timeouts while waiting for a validator to respond to a request",
+ },
+ InvalidSignatureResponses: prometheus.CounterOpts{
+ Name: "invalid_signature_responses",
+ Help: "Number of responses from validators that were not valid signatures",
+ },
+}
+
+type SignatureAggregatorMetrics struct {
+ AggregateSignaturesLatencyMS prometheus.Gauge
+ AggregateSignaturesRequestCount prometheus.Counter
+ FailuresToGetValidatorSet prometheus.Counter
+ FailuresToConnectToSufficientStake prometheus.Counter
+ FailuresSendingToNode prometheus.Counter
+ ValidatorTimeouts prometheus.Counter
+ InvalidSignatureResponses prometheus.Counter
+
+ // TODO: consider other failures to monitor. Issue #384 requires
+ // "network failures", but we probably don't handle those directly.
+ // Surely there are some error types specific to this layer that we can
+ // count.
+
+ // TODO: consider how the relayer keeps separate counts of aggregations
+ // by AppRequest vs by Warp API and whether we should have such counts.
+}
+
+func NewSignatureAggregatorMetrics(
+ registerer prometheus.Registerer,
+) *SignatureAggregatorMetrics {
+ m := SignatureAggregatorMetrics{
+ AggregateSignaturesLatencyMS: prometheus.NewGauge(
+ Opts.AggregateSignaturesLatencyMS,
+ ),
+ AggregateSignaturesRequestCount: prometheus.NewCounter(
+ Opts.AggregateSignaturesRequestCount,
+ ),
+ FailuresToGetValidatorSet: prometheus.NewCounter(
+ Opts.FailuresToGetValidatorSet,
+ ),
+ FailuresToConnectToSufficientStake: prometheus.NewCounter(
+ Opts.FailuresToConnectToSufficientStake,
+ ),
+ FailuresSendingToNode: prometheus.NewCounter(
+ Opts.FailuresSendingToNode,
+ ),
+ ValidatorTimeouts: prometheus.NewCounter(
+ Opts.ValidatorTimeouts,
+ ),
+ InvalidSignatureResponses: prometheus.NewCounter(
+ Opts.InvalidSignatureResponses,
+ ),
+ }
+
+ registerer.MustRegister(m.AggregateSignaturesLatencyMS)
+ registerer.MustRegister(m.AggregateSignaturesRequestCount)
+ registerer.MustRegister(m.FailuresToGetValidatorSet)
+ registerer.MustRegister(m.FailuresToConnectToSufficientStake)
+ registerer.MustRegister(m.FailuresSendingToNode)
+ registerer.MustRegister(m.ValidatorTimeouts)
+ registerer.MustRegister(m.InvalidSignatureResponses)
+
+ return &m
+}
+
+func (m *SignatureAggregatorMetrics) HandleMetricsRequest(
+ gatherer metrics.MultiGatherer,
+) {
+ http.Handle(
+ "/metrics",
+ promhttp.HandlerFor(gatherer, promhttp.HandlerOpts{}),
+ )
+}
+
+func Initialize(port uint16) *prometheus.Registry {
+ gatherer := metrics.NewPrefixGatherer()
+ registry := prometheus.NewRegistry()
+ err := gatherer.Register("signature-aggregator", registry)
+ if err != nil {
+ panic(
+ fmt.Errorf(
+ "failed to register metrics gatherer: %w",
+ err,
+ ),
+ )
+ }
+
+ http.Handle(
+ "/metrics",
+ promhttp.HandlerFor(gatherer, promhttp.HandlerOpts{}),
+ )
+
+ go func() {
+ log.Fatalln(http.ListenAndServe(fmt.Sprintf(":%d", port), nil))
+ }()
+
+ return registry
+}
diff --git a/signature-aggregator/sample-signature-aggregator-config.json b/signature-aggregator/sample-signature-aggregator-config.json
new file mode 100644
index 00000000..0baf318e
--- /dev/null
+++ b/signature-aggregator/sample-signature-aggregator-config.json
@@ -0,0 +1,10 @@
+{
+ "log-level": "debug",
+ "info-api": {
+ "base-url": "https://api.avax-test.network"
+ },
+ "p-chain-api": {
+ "base-url": "https://api.avax-test.network"
+ },
+ "api-port": 8080
+}
\ No newline at end of file
diff --git a/tests/e2e_test.go b/tests/e2e_test.go
index 414a5c69..7de98215 100644
--- a/tests/e2e_test.go
+++ b/tests/e2e_test.go
@@ -153,4 +153,7 @@ var _ = ginkgo.Describe("[AWM Relayer Integration Tests", func() {
ginkgo.It("Warp API", func() {
WarpAPIRelay(localNetworkInstance)
})
+ ginkgo.It("Signature Aggregator", func() {
+ SignatureAggregatorAPI(localNetworkInstance)
+ })
})
diff --git a/tests/signature_aggregator_api.go b/tests/signature_aggregator_api.go
new file mode 100644
index 00000000..fe7eac7e
--- /dev/null
+++ b/tests/signature_aggregator_api.go
@@ -0,0 +1,183 @@
+// Copyright (C) 2024, Ava Labs, Inc. All rights reserved.
+// See the file LICENSE for licensing terms.
+
+package tests
+
+import (
+ "bufio"
+ "bytes"
+ "context"
+ "encoding/hex"
+ "encoding/json"
+ "fmt"
+ "io"
+ "net/http"
+ "strconv"
+ "strings"
+ "time"
+
+ avalancheWarp "github.com/ava-labs/avalanchego/vms/platformvm/warp"
+ "github.com/ava-labs/awm-relayer/signature-aggregator/api"
+ "github.com/ava-labs/awm-relayer/signature-aggregator/metrics"
+ testUtils "github.com/ava-labs/awm-relayer/tests/utils"
+ "github.com/ava-labs/teleporter/tests/interfaces"
+ "github.com/ava-labs/teleporter/tests/utils"
+ "github.com/ethereum/go-ethereum/log"
+ . "github.com/onsi/gomega"
+)
+
+// Tests basic functionality of the Signature Aggregator API
+// Setup step:
+// - Sets up a primary network and a subnet.
+// - Builds and runs a signature aggregator executable.
+// Test Case 1:
+// - Sends a teleporter message from the primary network to the subnet.
+// - Reads the warp message unsigned bytes from the log
+// - Sends the unsigned message to the signature aggregator API
+// - Confirms that the signed message is returned and matches the originally sent message
+func SignatureAggregatorAPI(network interfaces.LocalNetwork) {
+ // Begin Setup step
+ ctx := context.Background()
+
+ subnetAInfo := network.GetPrimaryNetworkInfo()
+ subnetBInfo, _ := utils.GetTwoSubnets(network)
+ fundedAddress, fundedKey := network.GetFundedAccountInfo()
+
+ signatureAggregatorConfig := testUtils.CreateDefaultSignatureAggregatorConfig(
+ []interfaces.SubnetTestInfo{subnetAInfo, subnetBInfo},
+ )
+
+ signatureAggregatorConfigPath := testUtils.WriteSignatureAggregatorConfig(
+ signatureAggregatorConfig,
+ testUtils.DefaultSignatureAggregatorCfgFname,
+ )
+ log.Info("Starting the signature aggregator", "configPath", signatureAggregatorConfigPath)
+ signatureAggregatorCancel := testUtils.BuildAndRunSignatureAggregatorExecutable(ctx, signatureAggregatorConfigPath)
+ defer signatureAggregatorCancel()
+
+ // Sleep for some time to make sure signature aggregator has started up and subscribed.
+ log.Info("Waiting for the signature aggregator to start up")
+ time.Sleep(5 * time.Second)
+
+ // End setup step
+ // Begin Test Case 1
+
+ log.Info("Sending teleporter message")
+ receipt, _, _ := testUtils.SendBasicTeleporterMessage(
+ ctx,
+ subnetAInfo,
+ subnetBInfo,
+ fundedKey,
+ fundedAddress,
+ )
+ warpMessage := getWarpMessageFromLog(ctx, receipt, subnetAInfo)
+
+ reqBody := api.AggregateSignatureRequest{
+ UnsignedMessage: "0x" + hex.EncodeToString(warpMessage.Bytes()),
+ }
+
+ client := http.Client{
+ Timeout: 20 * time.Second,
+ }
+
+ requestURL := fmt.Sprintf("http://localhost:%d%s", signatureAggregatorConfig.APIPort, api.APIPath)
+
+ // Send request to API
+ {
+ b, err := json.Marshal(reqBody)
+ Expect(err).Should(BeNil())
+ bodyReader := bytes.NewReader(b)
+
+ req, err := http.NewRequest(http.MethodPost, requestURL, bodyReader)
+ Expect(err).Should(BeNil())
+ req.Header.Set("Content-Type", "application/json")
+
+ res, err := client.Do(req)
+ Expect(err).Should(BeNil())
+ Expect(res.Status).Should(Equal("200 OK"))
+ Expect(res.Header.Get("Content-Type")).Should(Equal("application/json"))
+
+ defer res.Body.Close()
+ body, err := io.ReadAll(res.Body)
+ Expect(err).Should(BeNil())
+
+ var response api.AggregateSignatureResponse
+ err = json.Unmarshal(body, &response)
+ Expect(err).Should(BeNil())
+
+ decodedMessage, err := hex.DecodeString(response.SignedMessage)
+ Expect(err).Should(BeNil())
+
+ signedMessage, err := avalancheWarp.ParseMessage(decodedMessage)
+ Expect(err).Should(BeNil())
+ Expect(signedMessage.ID()).Should(Equal(warpMessage.ID()))
+ }
+
+ // Check metrics
+ metricsSample := sampleMetrics(signatureAggregatorConfig.MetricsPort)
+ for _, m := range []struct {
+ name string
+ op string
+ value int
+ }{
+ {metrics.Opts.AggregateSignaturesRequestCount.Name, "==", 1},
+ {metrics.Opts.AggregateSignaturesLatencyMS.Name, ">", 0},
+ {metrics.Opts.FailuresToGetValidatorSet.Name, "==", 0},
+ {metrics.Opts.FailuresToConnectToSufficientStake.Name, "==", 0},
+ {metrics.Opts.FailuresSendingToNode.Name, "<", 5},
+ {metrics.Opts.ValidatorTimeouts.Name, "==", 0},
+ {metrics.Opts.InvalidSignatureResponses.Name, "==", 0},
+ } {
+ Expect(metricsSample[m.name]).Should(
+ BeNumerically(m.op, m.value),
+ )
+ }
+}
+
+// returns a map of metric names to metric samples
+func sampleMetrics(port uint16) map[string]uint64 {
+ resp, err := http.Get(
+ fmt.Sprintf("http://localhost:%d/metrics", port),
+ )
+ Expect(err).Should(BeNil())
+
+ body, err := io.ReadAll(resp.Body)
+ Expect(err).Should(BeNil())
+ defer resp.Body.Close()
+
+ var samples = make(map[string]uint64)
+ scanner := bufio.NewScanner(strings.NewReader(string(body)))
+ for scanner.Scan() {
+ line := scanner.Text()
+ for _, metricName := range []string{
+ metrics.Opts.AggregateSignaturesLatencyMS.Name,
+ metrics.Opts.AggregateSignaturesRequestCount.Name,
+ metrics.Opts.FailuresToGetValidatorSet.Name,
+ metrics.Opts.FailuresToConnectToSufficientStake.Name,
+ metrics.Opts.FailuresSendingToNode.Name,
+ metrics.Opts.ValidatorTimeouts.Name,
+ metrics.Opts.InvalidSignatureResponses.Name,
+ } {
+ if strings.HasPrefix(
+ line,
+ "U__signature_2d_aggregator_"+metricName,
+ ) {
+ log.Debug("Found metric line", "line", line)
+ parts := strings.Fields(line)
+
+ // Fetch the metric count from the last field of the line
+ value, err := strconv.ParseUint(parts[len(parts)-1], 10, 64)
+ if err != nil {
+ log.Warn("failed to parse value from metric line")
+ continue
+ }
+ log.Debug("parsed metric", "name", metricName, "value", value)
+
+ samples[metricName] = value
+ } else {
+ log.Debug("Ignoring non-metric line", "line", line)
+ }
+ }
+ }
+ return samples
+}
diff --git a/tests/utils/utils.go b/tests/utils/utils.go
index 99bbd41d..4543aa03 100644
--- a/tests/utils/utils.go
+++ b/tests/utils/utils.go
@@ -21,6 +21,7 @@ import (
warpPayload "github.com/ava-labs/avalanchego/vms/platformvm/warp/payload"
"github.com/ava-labs/awm-relayer/config"
offchainregistry "github.com/ava-labs/awm-relayer/messages/off-chain-registry"
+ signatureaggregatorcfg "github.com/ava-labs/awm-relayer/signature-aggregator/config"
batchcrosschainmessenger "github.com/ava-labs/awm-relayer/tests/abi-bindings/go/BatchCrossChainMessenger"
relayerUtils "github.com/ava-labs/awm-relayer/utils"
"github.com/ava-labs/subnet-evm/accounts/abi/bind"
@@ -43,13 +44,14 @@ import (
var StorageLocation = fmt.Sprintf("%s/.awm-relayer-storage", os.TempDir())
const (
- DefaultRelayerCfgFname = "relayer-config.json"
- DBUpdateSeconds = 1
+ DefaultRelayerCfgFname = "relayer-config.json"
+ DefaultSignatureAggregatorCfgFname = "signature-aggregator-config.json"
+ DBUpdateSeconds = 1
)
func BuildAndRunRelayerExecutable(ctx context.Context, relayerConfigPath string) context.CancelFunc {
// Build the awm-relayer binary
- cmd := exec.Command("./scripts/build.sh")
+ cmd := exec.Command("./scripts/build_relayer.sh")
out, err := cmd.CombinedOutput()
fmt.Println(string(out))
Expect(err).Should(BeNil())
@@ -102,6 +104,67 @@ func BuildAndRunRelayerExecutable(ctx context.Context, relayerConfigPath string)
}
}
+func BuildAndRunSignatureAggregatorExecutable(ctx context.Context, configPath string) context.CancelFunc {
+ // Build the signature-aggregator binary
+ cmd := exec.Command("./scripts/build_signature_aggregator.sh")
+ out, err := cmd.CombinedOutput()
+ fmt.Println(string(out))
+ Expect(err).Should(BeNil())
+
+ cmdOutput := make(chan string)
+
+ // Run signature-aggregator binary with config path
+ var signatureAggregatorCtx context.Context
+ signatureAggregatorCtx, signatureAggregatorCancelFunc := context.WithCancel(ctx)
+ log.Info("Instantiating the signature-aggregator executable command")
+ log.Info(fmt.Sprintf("./signature-aggregator/build/signature-aggregator --config-file %s ", configPath))
+ signatureAggregatorCmd := exec.CommandContext(
+ signatureAggregatorCtx,
+ "./signature-aggregator/build/signature-aggregator",
+ "--config-file",
+ configPath,
+ )
+
+ // Set up a pipe to capture the command's output
+ cmdStdOutReader, err := signatureAggregatorCmd.StdoutPipe()
+ Expect(err).Should(BeNil())
+ cmdStdErrReader, err := signatureAggregatorCmd.StderrPipe()
+ Expect(err).Should(BeNil())
+
+ // Start the command
+ log.Info("Starting the signature-aggregator executable")
+ err = signatureAggregatorCmd.Start()
+ Expect(err).Should(BeNil())
+
+ // Start goroutines to read and output the command's stdout and stderr
+ go func() {
+ scanner := bufio.NewScanner(cmdStdOutReader)
+ for scanner.Scan() {
+ log.Info(scanner.Text())
+ }
+ cmdOutput <- "Command execution finished"
+ }()
+ go func() {
+ scanner := bufio.NewScanner(cmdStdErrReader)
+ for scanner.Scan() {
+ log.Error(scanner.Text())
+ }
+ cmdOutput <- "Command execution finished"
+ }()
+ // Spawn a goroutine that will panic if the aggregator exits abnormally.
+ go func() {
+ err := signatureAggregatorCmd.Wait()
+ // Context cancellation is the only expected way for the process to exit, otherwise panic
+ if !errors.Is(signatureAggregatorCtx.Err(), context.Canceled) {
+ panic(fmt.Errorf("signature-aggregator exited abnormally: %w", err))
+ }
+ }()
+ return func() {
+ signatureAggregatorCancelFunc()
+ <-signatureAggregatorCtx.Done()
+ }
+}
+
func ReadHexTextFile(filename string) string {
fileData, err := os.ReadFile(filename)
Expect(err).Should(BeNil())
@@ -210,6 +273,32 @@ func CreateDefaultRelayerConfig(
}
}
+func CreateDefaultSignatureAggregatorConfig(
+ sourceSubnetsInfo []interfaces.SubnetTestInfo,
+) signatureaggregatorcfg.Config {
+ logLevel, err := logging.ToLevel(os.Getenv("LOG_LEVEL"))
+ if err != nil {
+ logLevel = logging.Info
+ }
+
+ log.Info(
+ "Setting up signature aggregator config",
+ "logLevel", logLevel.LowerString(),
+ )
+ // Construct the config values for each subnet
+ return signatureaggregatorcfg.Config{
+ LogLevel: logging.Info.LowerString(),
+ PChainAPI: &config.APIConfig{
+ BaseURL: sourceSubnetsInfo[0].NodeURIs[0],
+ },
+ InfoAPI: &config.APIConfig{
+ BaseURL: sourceSubnetsInfo[0].NodeURIs[0],
+ },
+ APIPort: 8080,
+ MetricsPort: 8081,
+ }
+}
+
func ClearRelayerStorage() error {
return os.RemoveAll(StorageLocation)
}
@@ -420,6 +509,22 @@ func WriteRelayerConfig(relayerConfig config.Config, fname string) string {
return relayerConfigPath
}
+// TODO define interface over Config and write a generic function to write either config
+func WriteSignatureAggregatorConfig(signatureAggregatorConfig signatureaggregatorcfg.Config, fname string) string {
+ data, err := json.MarshalIndent(signatureAggregatorConfig, "", "\t")
+ Expect(err).Should(BeNil())
+
+ f, err := os.CreateTemp(os.TempDir(), fname)
+ Expect(err).Should(BeNil())
+
+ _, err = f.Write(data)
+ Expect(err).Should(BeNil())
+ signatureAggregatorConfigPath := f.Name()
+
+ log.Info("Created signature-aggregator config", "configPath", signatureAggregatorConfigPath, "config", string(data))
+ return signatureAggregatorConfigPath
+}
+
func TriggerProcessMissedBlocks(
ctx context.Context,
sourceSubnetInfo interfaces.SubnetTestInfo,
diff --git a/utils/utils.go b/utils/utils.go
index 36b5cd2f..872d668e 100644
--- a/utils/utils.go
+++ b/utils/utils.go
@@ -54,6 +54,15 @@ func CheckStakeWeightExceedsThreshold(
return scaledTotalWeight.Cmp(scaledSigWeight) != 1
}
+// Wrapper for CheckStakeWeightExceedThreshold with a quorumDen of 100.
+func CheckStakeWeightPercentageExceedsThreshold(
+ accumulatedSignatureWeight *big.Int,
+ totalWeight uint64,
+ stakeWeightPercentage uint64,
+) bool {
+ return CheckStakeWeightExceedsThreshold(accumulatedSignatureWeight, totalWeight, stakeWeightPercentage, 100)
+}
+
//
// Chain Utils
//