From 5af8845c7ea6db1810a3ce5fbde683002a8169e8 Mon Sep 17 00:00:00 2001 From: Prem Chaitanya Prathi Date: Wed, 26 Jun 2024 07:18:14 +0530 Subject: [PATCH] chore_: update go-waku from master --- go.mod | 2 +- go.sum | 4 +-- .../waku-org/go-waku/waku/v2/api/filter.go | 19 ++++++----- .../go-waku/waku/v2/node/wakunode2.go | 4 +-- .../go-waku/waku/v2/node/wakuoptions.go | 14 ++++++++ .../go-waku/waku/v2/onlinechecker/online.go | 24 ++++++++++++++ .../waku/v2/peermanager/peer_connector.go | 32 ++++++++++++++----- .../waku/v2/peermanager/peer_discovery.go | 2 +- .../go-waku/waku/v2/protocol/filter/client.go | 23 +++++++++++-- .../waku/v2/protocol/filter/test_utils.go | 3 +- vendor/modules.txt | 3 +- 11 files changed, 101 insertions(+), 29 deletions(-) create mode 100644 vendor/github.com/waku-org/go-waku/waku/v2/onlinechecker/online.go diff --git a/go.mod b/go.mod index b6a84afa7d..b56fdc6e81 100644 --- a/go.mod +++ b/go.mod @@ -93,7 +93,7 @@ require ( github.com/schollz/peerdiscovery v1.7.0 github.com/siphiuel/lc-proxy-wrapper v0.0.0-20230516150924-246507cee8c7 github.com/urfave/cli/v2 v2.27.2 - github.com/waku-org/go-waku v0.8.1-0.20240625022701-1cf180ebc659 + github.com/waku-org/go-waku v0.8.1-0.20240626004844-19a47a1ac1f5 github.com/wk8/go-ordered-map/v2 v2.1.7 github.com/yeqown/go-qrcode/v2 v2.2.1 github.com/yeqown/go-qrcode/writer/standard v1.2.1 diff --git a/go.sum b/go.sum index 6e8bbc9182..2b0869ef78 100644 --- a/go.sum +++ b/go.sum @@ -2137,8 +2137,8 @@ github.com/waku-org/go-discover v0.0.0-20240506173252-4912704efdc5 h1:4K3IS97Jry github.com/waku-org/go-discover v0.0.0-20240506173252-4912704efdc5/go.mod h1:eBHgM6T4EG0RZzxpxKy+rGz/6Dw2Nd8DWxS0lm9ESDw= github.com/waku-org/go-libp2p-rendezvous v0.0.0-20240110193335-a67d1cc760a0 h1:R4YYx2QamhBRl/moIxkDCNW+OP7AHbyWLBygDc/xIMo= github.com/waku-org/go-libp2p-rendezvous v0.0.0-20240110193335-a67d1cc760a0/go.mod h1:EhZP9fee0DYjKH/IOQvoNSy1tSHp2iZadsHGphcAJgY= -github.com/waku-org/go-waku v0.8.1-0.20240625022701-1cf180ebc659 h1:bTz24QgoRBXoS/WIy8+7jrQ/7hpE63td0fMteq5qZQM= -github.com/waku-org/go-waku v0.8.1-0.20240625022701-1cf180ebc659/go.mod h1:biffO55kWbvfO8jdu/aAPiWcmozrfFKPum4EMFDib+k= +github.com/waku-org/go-waku v0.8.1-0.20240626004844-19a47a1ac1f5 h1:9UyIIy/IvlJB2nHIXydne6OfNfOWPPL08+XmCI3iEBo= +github.com/waku-org/go-waku v0.8.1-0.20240626004844-19a47a1ac1f5/go.mod h1:biffO55kWbvfO8jdu/aAPiWcmozrfFKPum4EMFDib+k= github.com/waku-org/go-zerokit-rln v0.1.14-0.20240102145250-fa738c0bdf59 h1:jisj+OCI6QydLtFq3Pyhu49wl9ytPN7oAHjMfepHDrA= github.com/waku-org/go-zerokit-rln v0.1.14-0.20240102145250-fa738c0bdf59/go.mod h1:1PdBdPzyTaKt3VnpAHk3zj+r9dXPFOr3IHZP9nFle6E= github.com/waku-org/go-zerokit-rln-apple v0.0.0-20230916172309-ee0ee61dde2b h1:KgZVhsLkxsj5gb/FfndSCQu6VYwALrCOgYI3poR95yE= diff --git a/vendor/github.com/waku-org/go-waku/waku/v2/api/filter.go b/vendor/github.com/waku-org/go-waku/waku/v2/api/filter.go index f7bfa73933..2798d13739 100644 --- a/vendor/github.com/waku-org/go-waku/waku/v2/api/filter.go +++ b/vendor/github.com/waku-org/go-waku/waku/v2/api/filter.go @@ -7,6 +7,7 @@ import ( "github.com/google/uuid" "github.com/libp2p/go-libp2p/core/peer" + "github.com/waku-org/go-waku/waku/v2/onlinechecker" "github.com/waku-org/go-waku/waku/v2/protocol" "github.com/waku-org/go-waku/waku/v2/protocol/filter" "github.com/waku-org/go-waku/waku/v2/protocol/subscription" @@ -39,13 +40,13 @@ type Sub struct { cancel context.CancelFunc log *zap.Logger closing chan string - isNodeOnline bool //indicates if node has connectivity, this helps subscribe loop takes decision as to resubscribe or not. + onlineChecker onlinechecker.OnlineChecker resubscribeInProgress bool id string } // Subscribe -func Subscribe(ctx context.Context, wf *filter.WakuFilterLightNode, contentFilter protocol.ContentFilter, config FilterConfig, log *zap.Logger, online bool) (*Sub, error) { +func Subscribe(ctx context.Context, wf *filter.WakuFilterLightNode, contentFilter protocol.ContentFilter, config FilterConfig, log *zap.Logger) (*Sub, error) { sub := new(Sub) sub.id = uuid.NewString() sub.wf = wf @@ -56,14 +57,16 @@ func Subscribe(ctx context.Context, wf *filter.WakuFilterLightNode, contentFilte sub.Config = config sub.log = log.Named("filter-api").With(zap.String("apisub-id", sub.id), zap.Stringer("content-filter", sub.ContentFilter)) sub.log.Debug("filter subscribe params", zap.Int("max-peers", config.MaxPeers)) - sub.isNodeOnline = online sub.closing = make(chan string, config.MaxPeers) - if online { + + sub.onlineChecker = wf.OnlineChecker() + if wf.OnlineChecker().IsOnline() { subs, err := sub.subscribe(contentFilter, sub.Config.MaxPeers) if err == nil { sub.multiplex(subs) } } + go sub.subscriptionLoop() return sub, nil } @@ -72,17 +75,13 @@ func (apiSub *Sub) Unsubscribe() { apiSub.cancel() } -func (apiSub *Sub) SetNodeState(online bool) { - apiSub.isNodeOnline = online -} - func (apiSub *Sub) subscriptionLoop() { ticker := time.NewTicker(5 * time.Second) defer ticker.Stop() for { select { case <-ticker.C: - if apiSub.isNodeOnline && len(apiSub.subs) < apiSub.Config.MaxPeers && + if apiSub.onlineChecker.IsOnline() && len(apiSub.subs) < apiSub.Config.MaxPeers && !apiSub.resubscribeInProgress && len(apiSub.closing) < apiSub.Config.MaxPeers { apiSub.closing <- "" } @@ -109,7 +108,7 @@ func (apiSub *Sub) checkAndResubscribe(subId string) { delete(apiSub.subs, subId) } apiSub.log.Debug("subscription status", zap.Int("sub-count", len(apiSub.subs)), zap.Stringer("content-filter", apiSub.ContentFilter)) - if apiSub.isNodeOnline && len(apiSub.subs) < apiSub.Config.MaxPeers { + if apiSub.onlineChecker.IsOnline() && len(apiSub.subs) < apiSub.Config.MaxPeers { apiSub.resubscribe(failedPeer) } apiSub.resubscribeInProgress = false diff --git a/vendor/github.com/waku-org/go-waku/waku/v2/node/wakunode2.go b/vendor/github.com/waku-org/go-waku/waku/v2/node/wakunode2.go index 4b3508a467..d8c280c17d 100644 --- a/vendor/github.com/waku-org/go-waku/waku/v2/node/wakunode2.go +++ b/vendor/github.com/waku-org/go-waku/waku/v2/node/wakunode2.go @@ -258,7 +258,7 @@ func New(opts ...WakuNodeOption) (*WakuNode, error) { //Initialize peer manager. w.peermanager = peermanager.NewPeerManager(w.opts.maxPeerConnections, w.opts.peerStoreCapacity, metadata, params.enableRelay, w.log) - w.peerConnector, err = peermanager.NewPeerConnectionStrategy(w.peermanager, discoveryConnectTimeout, w.log) + w.peerConnector, err = peermanager.NewPeerConnectionStrategy(w.peermanager, w.opts.onlineChecker, discoveryConnectTimeout, w.log) if err != nil { w.log.Error("creating peer connection strategy", zap.Error(err)) } @@ -290,7 +290,7 @@ func New(opts ...WakuNodeOption) (*WakuNode, error) { w.opts.filterOpts = append(w.opts.filterOpts, filter.WithPeerManager(w.peermanager)) w.filterFullNode = filter.NewWakuFilterFullNode(w.timesource, w.opts.prometheusReg, w.log, w.opts.filterOpts...) - w.filterLightNode = filter.NewWakuFilterLightNode(w.bcaster, w.peermanager, w.timesource, w.opts.prometheusReg, w.log) + w.filterLightNode = filter.NewWakuFilterLightNode(w.bcaster, w.peermanager, w.timesource, w.opts.onlineChecker, w.opts.prometheusReg, w.log) w.lightPush = lightpush.NewWakuLightPush(w.Relay(), w.peermanager, w.opts.prometheusReg, w.log, w.opts.lightpushOpts...) w.store = store.NewWakuStore(w.peermanager, w.timesource, w.log) diff --git a/vendor/github.com/waku-org/go-waku/waku/v2/node/wakuoptions.go b/vendor/github.com/waku-org/go-waku/waku/v2/node/wakuoptions.go index a34376c189..26a82d0d09 100644 --- a/vendor/github.com/waku-org/go-waku/waku/v2/node/wakuoptions.go +++ b/vendor/github.com/waku-org/go-waku/waku/v2/node/wakuoptions.go @@ -25,6 +25,7 @@ import ( "github.com/multiformats/go-multiaddr" manet "github.com/multiformats/go-multiaddr/net" "github.com/prometheus/client_golang/prometheus" + "github.com/waku-org/go-waku/waku/v2/onlinechecker" "github.com/waku-org/go-waku/waku/v2/peermanager" "github.com/waku-org/go-waku/waku/v2/protocol/filter" "github.com/waku-org/go-waku/waku/v2/protocol/legacy_store" @@ -63,6 +64,8 @@ type WakuNodeParameters struct { circuitRelayMinInterval time.Duration circuitRelayBootDelay time.Duration + onlineChecker onlinechecker.OnlineChecker + enableNTP bool ntpURLs []string @@ -130,6 +133,7 @@ var DefaultWakuNodeOptions = []WakuNodeOption{ WithMaxConnectionsPerIP(DefaultMaxConnectionsPerIP), WithCircuitRelayParams(2*time.Second, 3*time.Minute), WithPeerStoreCapacity(DefaultMaxPeerStoreCapacity), + WithOnlineChecker(onlinechecker.NewDefaultOnlineChecker(true)), } // MultiAddresses return the list of multiaddresses configured in the node @@ -554,6 +558,16 @@ func WithTopicHealthStatusChannel(ch chan<- peermanager.TopicHealthStatus) WakuN } } +// WithOnlineChecker sets up an OnlineChecker which will be used to determine whether the node +// is online or not. The OnlineChecker must be implemented by consumers of go-waku since they +// have additional context to determine what it means for a node to be online/offline +func WithOnlineChecker(onlineChecker onlinechecker.OnlineChecker) WakuNodeOption { + return func(params *WakuNodeParameters) error { + params.onlineChecker = onlineChecker + return nil + } +} + // Default options used in the libp2p node var DefaultLibP2POptions = []libp2p.Option{ libp2p.ChainOptions( diff --git a/vendor/github.com/waku-org/go-waku/waku/v2/onlinechecker/online.go b/vendor/github.com/waku-org/go-waku/waku/v2/onlinechecker/online.go new file mode 100644 index 0000000000..814d57e752 --- /dev/null +++ b/vendor/github.com/waku-org/go-waku/waku/v2/onlinechecker/online.go @@ -0,0 +1,24 @@ +package onlinechecker + +// OnlineChecker is used to determine if node has connectivity. +type OnlineChecker interface { + IsOnline() bool +} + +type DefaultOnlineChecker struct { + online bool +} + +func NewDefaultOnlineChecker(online bool) OnlineChecker { + return &DefaultOnlineChecker{ + online: online, + } +} + +func (o *DefaultOnlineChecker) SetOnline(online bool) { + o.online = online +} + +func (o *DefaultOnlineChecker) IsOnline() bool { + return o.online +} diff --git a/vendor/github.com/waku-org/go-waku/waku/v2/peermanager/peer_connector.go b/vendor/github.com/waku-org/go-waku/waku/v2/peermanager/peer_connector.go index e53d9c09de..ebe808e85f 100644 --- a/vendor/github.com/waku-org/go-waku/waku/v2/peermanager/peer_connector.go +++ b/vendor/github.com/waku-org/go-waku/waku/v2/peermanager/peer_connector.go @@ -16,6 +16,7 @@ import ( "github.com/libp2p/go-libp2p/p2p/discovery/backoff" "github.com/waku-org/go-waku/logging" + "github.com/waku-org/go-waku/waku/v2/onlinechecker" wps "github.com/waku-org/go-waku/waku/v2/peerstore" "github.com/waku-org/go-waku/waku/v2/service" @@ -27,10 +28,11 @@ import ( // PeerConnectionStrategy is a utility to connect to peers, // but only if we have not recently tried connecting to them already type PeerConnectionStrategy struct { - mux sync.Mutex - cache *lru.TwoQueueCache - host host.Host - pm *PeerManager + mux sync.Mutex + cache *lru.TwoQueueCache + host host.Host + pm *PeerManager + onlineChecker onlinechecker.OnlineChecker paused atomic.Bool dialTimeout time.Duration @@ -59,8 +61,12 @@ func getBackOff() backoff.BackoffFactory { // // dialTimeout is how long we attempt to connect to a peer before giving up // minPeers is the minimum number of peers that the node should have -func NewPeerConnectionStrategy(pm *PeerManager, - dialTimeout time.Duration, logger *zap.Logger) (*PeerConnectionStrategy, error) { +func NewPeerConnectionStrategy( + pm *PeerManager, + onlineChecker onlinechecker.OnlineChecker, + dialTimeout time.Duration, + logger *zap.Logger, +) (*PeerConnectionStrategy, error) { // cacheSize is the size of a TwoQueueCache cacheSize := 600 cache, err := lru.New2Q(cacheSize) @@ -72,6 +78,7 @@ func NewPeerConnectionStrategy(pm *PeerManager, cache: cache, dialTimeout: dialTimeout, CommonDiscoveryService: service.NewCommonDiscoveryService(), + onlineChecker: onlineChecker, pm: pm, backoff: getBackOff(), logger: logger.Named("discovery-connector"), @@ -171,6 +178,10 @@ func (c *PeerConnectionStrategy) isPaused() bool { return c.paused.Load() } +func (c *PeerConnectionStrategy) SetPaused(paused bool) { + c.paused.Store(paused) +} + // it might happen Subscribe is called before peerConnector has started so store these subscriptions in subscriptions array and custom after c.cancel is set. func (c *PeerConnectionStrategy) consumeSubscriptions() { for _, subs := range c.subscriptions { @@ -234,10 +245,17 @@ func (c *PeerConnectionStrategy) dialPeers() { for { select { + case <-c.Context().Done(): + return case pd, ok := <-c.GetListeningChan(): if !ok { return } + + if !c.onlineChecker.IsOnline() { + continue + } + addrInfo := pd.AddrInfo if addrInfo.ID == c.host.ID() || addrInfo.ID == "" || @@ -250,8 +268,6 @@ func (c *PeerConnectionStrategy) dialPeers() { c.WaitGroup().Add(1) go c.dialPeer(addrInfo, sem) } - case <-c.Context().Done(): - return } } } diff --git a/vendor/github.com/waku-org/go-waku/waku/v2/peermanager/peer_discovery.go b/vendor/github.com/waku-org/go-waku/waku/v2/peermanager/peer_discovery.go index 6d05c69ba1..ae18907c02 100644 --- a/vendor/github.com/waku-org/go-waku/waku/v2/peermanager/peer_discovery.go +++ b/vendor/github.com/waku-org/go-waku/waku/v2/peermanager/peer_discovery.go @@ -55,7 +55,7 @@ func (pm *PeerManager) discoverOnDemand(cluster uint16, wakuProtoInfo, ok := pm.wakuprotoToENRFieldMap[wakuProtocol] if !ok { - pm.logger.Info("cannot do on demand discovery for non-waku protocol", zap.String("protocol", string(wakuProtocol))) + pm.logger.Warn("cannot do on demand discovery for non-waku protocol", zap.String("protocol", string(wakuProtocol))) return nil, errors.New("cannot do on demand discovery for non-waku protocol") } iterator, err := pm.discoveryService.PeerIterator( diff --git a/vendor/github.com/waku-org/go-waku/waku/v2/protocol/filter/client.go b/vendor/github.com/waku-org/go-waku/waku/v2/protocol/filter/client.go index 70edd644c3..5909bbbd8c 100644 --- a/vendor/github.com/waku-org/go-waku/waku/v2/protocol/filter/client.go +++ b/vendor/github.com/waku-org/go-waku/waku/v2/protocol/filter/client.go @@ -18,6 +18,7 @@ import ( "github.com/libp2p/go-msgio/pbio" "github.com/prometheus/client_golang/prometheus" "github.com/waku-org/go-waku/logging" + "github.com/waku-org/go-waku/waku/v2/onlinechecker" "github.com/waku-org/go-waku/waku/v2/peermanager" "github.com/waku-org/go-waku/waku/v2/peerstore" "github.com/waku-org/go-waku/waku/v2/protocol" @@ -45,7 +46,8 @@ var ( type WakuFilterLightNode struct { *service.CommonService h host.Host - broadcaster relay.Broadcaster //TODO: Move the broadcast functionality outside of relay client to a higher SDK layer.s + broadcaster relay.Broadcaster //TODO: Move the broadcast functionality outside of relay client to a higher SDK layer. + onlineChecker onlinechecker.OnlineChecker timesource timesource.Timesource metrics Metrics log *zap.Logger @@ -79,12 +81,19 @@ func (arr *WakuFilterPushResult) Errors() []WakuFilterPushError { // Note that broadcaster is optional. // Takes an optional peermanager if WakuFilterLightnode is being created along with WakuNode. // If using libp2p host, then pass peermanager as nil -func NewWakuFilterLightNode(broadcaster relay.Broadcaster, pm *peermanager.PeerManager, - timesource timesource.Timesource, reg prometheus.Registerer, log *zap.Logger) *WakuFilterLightNode { +func NewWakuFilterLightNode( + broadcaster relay.Broadcaster, + pm *peermanager.PeerManager, + timesource timesource.Timesource, + onlineChecker onlinechecker.OnlineChecker, + reg prometheus.Registerer, + log *zap.Logger, +) *WakuFilterLightNode { wf := new(WakuFilterLightNode) wf.log = log.Named("filterv2-lightnode") wf.broadcaster = broadcaster wf.timesource = timesource + wf.onlineChecker = onlineChecker wf.pm = pm wf.CommonService = service.NewCommonService() wf.metrics = newMetrics(reg) @@ -701,3 +710,11 @@ func (wf *WakuFilterLightNode) UnsubscribeAll(ctx context.Context, opts ...Filte return wf.unsubscribeAll(ctx, opts...) } + +func (wf *WakuFilterLightNode) OnlineChecker() onlinechecker.OnlineChecker { + return wf.onlineChecker +} + +func (wf *WakuFilterLightNode) SetOnlineChecker(onlineChecker onlinechecker.OnlineChecker) { + wf.onlineChecker = onlineChecker +} diff --git a/vendor/github.com/waku-org/go-waku/waku/v2/protocol/filter/test_utils.go b/vendor/github.com/waku-org/go-waku/waku/v2/protocol/filter/test_utils.go index 8be1df3588..361ab561b6 100644 --- a/vendor/github.com/waku-org/go-waku/waku/v2/protocol/filter/test_utils.go +++ b/vendor/github.com/waku-org/go-waku/waku/v2/protocol/filter/test_utils.go @@ -13,6 +13,7 @@ import ( "github.com/prometheus/client_golang/prometheus" "github.com/stretchr/testify/suite" "github.com/waku-org/go-waku/tests" + "github.com/waku-org/go-waku/waku/v2/onlinechecker" "github.com/waku-org/go-waku/waku/v2/peermanager" wps "github.com/waku-org/go-waku/waku/v2/peerstore" "github.com/waku-org/go-waku/waku/v2/protocol" @@ -165,7 +166,7 @@ func (s *FilterTestSuite) GetWakuFilterLightNode() LightNodeData { b := relay.NewBroadcaster(10) s.Require().NoError(b.Start(context.Background())) pm := peermanager.NewPeerManager(5, 5, nil, true, s.Log) - filterPush := NewWakuFilterLightNode(b, pm, timesource.NewDefaultClock(), prometheus.DefaultRegisterer, s.Log) + filterPush := NewWakuFilterLightNode(b, pm, timesource.NewDefaultClock(), onlinechecker.NewDefaultOnlineChecker(true), prometheus.DefaultRegisterer, s.Log) filterPush.SetHost(host) pm.SetHost(host) return LightNodeData{filterPush, host} diff --git a/vendor/modules.txt b/vendor/modules.txt index 5aebf5910e..a7bac7fad9 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -1015,7 +1015,7 @@ github.com/waku-org/go-discover/discover/v5wire github.com/waku-org/go-libp2p-rendezvous github.com/waku-org/go-libp2p-rendezvous/db github.com/waku-org/go-libp2p-rendezvous/pb -# github.com/waku-org/go-waku v0.8.1-0.20240625022701-1cf180ebc659 +# github.com/waku-org/go-waku v0.8.1-0.20240626004844-19a47a1ac1f5 ## explicit; go 1.21 github.com/waku-org/go-waku/logging github.com/waku-org/go-waku/tests @@ -1025,6 +1025,7 @@ github.com/waku-org/go-waku/waku/v2/discv5 github.com/waku-org/go-waku/waku/v2/dnsdisc github.com/waku-org/go-waku/waku/v2/hash github.com/waku-org/go-waku/waku/v2/node +github.com/waku-org/go-waku/waku/v2/onlinechecker github.com/waku-org/go-waku/waku/v2/payload github.com/waku-org/go-waku/waku/v2/peermanager github.com/waku-org/go-waku/waku/v2/peerstore