Skip to content

peermanager plugin #18

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 14 commits into
base: develop
Choose a base branch
from
Open
46 changes: 46 additions & 0 deletions plugins/peermanager/kafka.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
package peermanager

import (
"fmt"
"strings"

"github.com/Shopify/sarama"
"github.com/openrelayxyz/cardinal-streams/transports"
)

func strPtr(x string) *string {
return &x
}

func createProducer(broker, topic string) (sarama.AsyncProducer, error) {

sessionBrokers, sessionKafkaConfig = transports.ParseKafkaURL(strings.TrimPrefix(broker, "kafka://"))
configEntries := make(map[string]*string)
configEntries["retention.ms"] = strPtr("3600000")

if err := transports.CreateTopicIfDoesNotExist(strings.TrimPrefix(broker, "kafka://"), topic, 1, configEntries); err != nil {
panic(fmt.Sprintf("Could not create topic %v on broker %v: %v", topic, broker, err.Error()))
}

producer, err := sarama.NewAsyncProducer(sessionBrokers, sessionKafkaConfig)
if err != nil {
panic(fmt.Sprintf("Could not setup producer, peer manager plugin: %v", err.Error()))
}

return producer, nil
}

func createConsumer(broker, topic string) (sarama.PartitionConsumer, error) {

consumer, err := sarama.NewConsumer(sessionBrokers, sessionKafkaConfig)
if err != nil {
return nil, err
}

partitionConsumer, err := consumer.ConsumePartition(topic, 0, sarama.OffsetOldest)
if err != nil {
return nil, err
}

return partitionConsumer, nil
}
108 changes: 108 additions & 0 deletions plugins/peermanager/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
package peermanager

import (
"fmt"

"github.com/Shopify/sarama"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/node"
"github.com/openrelayxyz/xplugeth"
"github.com/openrelayxyz/xplugeth/hooks/initialize"
"github.com/openrelayxyz/xplugeth/types"
)

type peerManagerConfig struct {
BrokerURL string `yaml:"peerManager.broker.url"`
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's make the yaml broker.url. Since plugeth2 will have its own config files for each plugin, we don't need to namespace it here too.

}

var cfg *peerManagerConfig

var (
httpApiFlagName = "http.api"
sessionStack node.Node
sessionBrokers []string
sessionKafkaConfig *sarama.Config
nodes = make(chan string, 5)
exit = make(chan struct{}, 1)
peerBroker string
)

type peerManagerModule struct{}

func init() {
xplugeth.RegisterModule[peerManagerModule]("peerManagerModule")
}

func (*peerManagerModule) InitializeNode(s *node.Node, b types.Backend) {
sessionStack = *s
var ok bool
cfg, ok = xplugeth.GetConfig[peerManagerConfig]("peerManager")
peerBroker = cfg.BrokerURL
if !ok {
log.Warn("did not acqire config, example plugin, all values set to default")
}
log.Info("Initialized node, peer manager plugin")
}

func (*peerManagerModule) Blockchain() {
if peerBroker == "" {
panic(fmt.Sprintf("no broker provided for peer manager plugin"))
}
go peeringSequence()
}

func peeringSequence() {
sessionPeerService, err := getPeerManager()
if err != nil {
log.Error("session peer service unavailable, peer manager plugin", "err", err)
return
}

selfNode, err := sessionPeerService.getEnode()
if err != nil {
log.Error("error calling getEnode from sessionService, peer manager plugin", "err", err)
}

chainTopic, err := sessionPeerService.chainIdResolver()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's pull the topic from the config. We can use chainIdResolver() if it's not explicitly set, but making this configurable would let us have separate clusters of nodes we don't force to peer with eachother.

if err != nil {
log.Error("Error aquiring chainID, peer manager plugin", "err", err)
}

producer, err := createProducer(peerBroker, chainTopic)
if err != nil {
log.Error("failed to acquire kafka producer, peer manager plugin", "err", err)
return
}

consumer, err := createConsumer(peerBroker, chainTopic)
if err != nil {
log.Error("failed to acquire kafka consumer, peer manager plugin", "err", err)
return
}

msg := &sarama.ProducerMessage{
Topic: chainTopic,
Value: sarama.StringEncoder(selfNode),
}

producer.Input() <- msg

go func() {
for message := range consumer.Messages() {
nodes <- string(message.Value)
}
}()

for message := range nodes {
if message == selfNode {
continue
} else {
sessionPeerService.attachPeers(message)
}
}
}

var (
_ initialize.Blockchain = (*peerManagerModule)(nil)
_ initialize.Initializer = (*peerManagerModule)(nil)
)
84 changes: 84 additions & 0 deletions plugins/peermanager/peer-manager.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
package peermanager

import (
"fmt"

"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/rpc"
)

type PeerManagerService struct {
client *rpc.Client
}

type pseudoNodeInfo struct {
Enode string
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's add JSON hints here. I don't like relying on struct field names for JSON encoding / decoding when we can be explicit.

}

func getPeerManager() (*PeerManagerService, error) {
c := sessionStack.Attach()

s := &PeerManagerService{
client: c,
}
return s, nil
}

func (service *PeerManagerService) getEnode() (string, error) {
var enode pseudoNodeInfo
err := service.client.Call(&enode, "admin_nodeInfo")
if err != nil {
return "", err
}
return enode.Enode, nil
}

func (service *PeerManagerService) attachPeers(peer string) {

var addTrustedPeerResult bool
err := service.client.Call(&addTrustedPeerResult, "admin_addTrustedPeer", peer)
if err != nil {
log.Error("error calling admin_addTrustedPeer, peer manager plugin", "peer", peer, "err", err)
}
if !addTrustedPeerResult {
log.Error("addTrustedPeer returned false, peer manager plugin", "peer", peer, "err", err)
}

var addPeerResult bool
err = service.client.Call(&addPeerResult, "admin_addPeer", peer)
if err != nil {
log.Error("error calling admin_addPeer, peer manager plugin", "peer", peer, "err", err)
}
if !addPeerResult {
log.Error("addPeer returned false, peer manager plugin", "peer", peer, "err", err)
}
log.Info("added peer, peer manager plugin", "added", peer)
}

func (service *PeerManagerService) chainIdResolver() (string, error) {

var chainID string
err := service.client.Call(&chainID, "eth_chainId")
if err != nil {
return "", err
}
var result string
switch chainID {
case "0x1":
result = "mainnet"
case "0x3d":
result = "etc"
case "0x4268":
result = "holesky"
case "0xaa36a7":
result = "sepolia"
case "0x89":
result = "polygon"
case "0x13881":
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's add amoy here too:

"0x13882"

result = "mumbai"
default:
log.Warn("unknown chain, chainID could not be resolved, peer manager plugin")
result = chainID
}
return fmt.Sprintf("peers-%v", result), nil
}