Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 54 additions & 6 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,10 @@ import (
bhost "github.com/libp2p/go-libp2p/p2p/host/basic"
blankhost "github.com/libp2p/go-libp2p/p2p/host/blank"
"github.com/libp2p/go-libp2p/p2p/host/eventbus"
"github.com/libp2p/go-libp2p/p2p/host/natmanager"
"github.com/libp2p/go-libp2p/p2p/host/observedaddrs"
"github.com/libp2p/go-libp2p/p2p/host/peerstore/pstoremem"
"github.com/libp2p/go-libp2p/p2p/host/pstoremanager"
rcmgr "github.com/libp2p/go-libp2p/p2p/host/resource-manager"
routed "github.com/libp2p/go-libp2p/p2p/host/routed"
"github.com/libp2p/go-libp2p/p2p/net/swarm"
Expand All @@ -39,6 +41,7 @@ import (
relayv2 "github.com/libp2p/go-libp2p/p2p/protocol/circuitv2/relay"
"github.com/libp2p/go-libp2p/p2p/protocol/holepunch"
"github.com/libp2p/go-libp2p/p2p/protocol/identify"
"github.com/libp2p/go-libp2p/p2p/protocol/ping"
"github.com/libp2p/go-libp2p/p2p/security/insecure"
"github.com/libp2p/go-libp2p/p2p/transport/quicreuse"
"github.com/libp2p/go-libp2p/p2p/transport/tcpreuse"
Expand Down Expand Up @@ -117,9 +120,10 @@ type Config struct {
ConnManager connmgr.ConnManager
ResourceManager network.ResourceManager

NATManager NATManagerC
Peerstore peerstore.Peerstore
Reporter metrics.Reporter
EnableNATPortMap bool
NATManager bhost.NATManager
Peerstore peerstore.Peerstore
Reporter metrics.Reporter

MultiaddrResolver network.MultiaddrDNSResolver

Expand Down Expand Up @@ -446,13 +450,10 @@ func (cfg *Config) newBasicHost(swrm *swarm.Swarm, eventBus event.Bus, an *auton
ConnManager: cfg.ConnManager,
AddrsFactory: cfg.AddrsFactory,
NATManager: cfg.NATManager,
EnablePing: !cfg.DisablePing,
UserAgent: cfg.UserAgent,
ProtocolVersion: cfg.ProtocolVersion,
EnableHolePunching: cfg.EnableHolePunching,
HolePunchingOptions: cfg.HolePunchingOptions,
EnableRelayService: cfg.EnableRelayService,
RelayServiceOpts: cfg.RelayServiceOpts,
EnableMetrics: !cfg.DisableMetrics,
PrometheusRegisterer: cfg.PrometheusRegisterer,
AutoNATv2: an,
Expand Down Expand Up @@ -554,6 +555,17 @@ func (cfg *Config) NewNode() (host.Host, error) {
})
return o, nil
}),
fx.Provide(func(s *swarm.Swarm, lifecycle fx.Lifecycle) (bhost.NATManager, error) {
if !cfg.EnableNATPortMap && cfg.NATManager == nil {
return nil, nil
}
if cfg.NATManager != nil {
return cfg.NATManager, nil
}
nm := natmanager.New(s)
lifecycle.Append(fx.StartStopHook(nm.Start, nm.Close))
return nm, nil
}),
fx.Provide(func() (*autonatv2.AutoNAT, error) {
if !cfg.EnableAutoNATv2 {
return nil, nil
Expand All @@ -580,6 +592,13 @@ func (cfg *Config) NewNode() (host.Host, error) {
return bh
}),
fx.Provide(func(h *swarm.Swarm) peer.ID { return h.LocalPeer() }),
fx.Provide(func(h host.Host) (*pstoremanager.PeerstoreManager, error) {
psm, err := pstoremanager.NewPeerstoreManager(h.Peerstore(), h.EventBus(), h.Network())
if err != nil {
return nil, fmt.Errorf("failed to create PeerstoreManager: %w", err)
}
return psm, nil
}),
}
transportOpts, err := cfg.addTransports()
if err != nil {
Expand All @@ -597,6 +616,11 @@ func (cfg *Config) NewNode() (host.Host, error) {
)
}

fxopts = append(fxopts, fx.Invoke(func(psm *pstoremanager.PeerstoreManager, lifecycle fx.Lifecycle) error {
lifecycle.Append(fx.StartStopHook(psm.Start, psm.Close))
return nil
}))

// enable autorelay
fxopts = append(fxopts,
fx.Invoke(func(h *bhost.BasicHost, lifecycle fx.Lifecycle) error {
Expand All @@ -619,6 +643,30 @@ func (cfg *Config) NewNode() (host.Host, error) {
}),
)

if !cfg.DisablePing {
fxopts = append(fxopts, fx.Invoke(func(h *bhost.BasicHost) {
ping.NewPingService(h)
}))
}

if cfg.EnableRelayService {
fxopts = append(fxopts, fx.Invoke(func(h host.Host, lifecycle fx.Lifecycle) error {
if !cfg.DisableMetrics {
// Prefer explicitly provided metrics tracer
metricsOpt := []relayv2.Option{
relayv2.WithMetricsTracer(
relayv2.NewMetricsTracer(relayv2.WithRegisterer(cfg.PrometheusRegisterer)))}
cfg.RelayServiceOpts = append(metricsOpt, cfg.RelayServiceOpts...)
}
rs, err := relayv2.New(h, cfg.RelayServiceOpts...)
if err != nil {
return err
}
lifecycle.Append(fx.StartStopHook(rs.Start, rs.Close))
return nil
}))
}

var bh *bhost.BasicHost
fxopts = append(fxopts, fx.Invoke(func(bho *bhost.BasicHost) { bh = bho }))
fxopts = append(fxopts, fx.Invoke(func(h *bhost.BasicHost, lifecycle fx.Lifecycle) {
Expand Down
46 changes: 46 additions & 0 deletions libp2p_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/libp2p/go-libp2p/core/pnet"
"github.com/libp2p/go-libp2p/core/routing"
"github.com/libp2p/go-libp2p/core/transport"
"github.com/libp2p/go-libp2p/p2p/host/pstoremanager"
rcmgr "github.com/libp2p/go-libp2p/p2p/host/resource-manager"
"github.com/libp2p/go-libp2p/p2p/net/swarm"
"github.com/libp2p/go-libp2p/p2p/protocol/ping"
Expand All @@ -44,6 +45,7 @@ import (
"github.com/pion/webrtc/v4"
quicgo "github.com/quic-go/quic-go"
wtgo "github.com/quic-go/webtransport-go"
"go.uber.org/fx"
"go.uber.org/goleak"

ma "github.com/multiformats/go-multiaddr"
Expand Down Expand Up @@ -917,3 +919,47 @@ func TestConnAs(t *testing.T) {
})
}
}

func TestPeerstoreManager(t *testing.T) {
ctx := t.Context()

var psm1, psm2 *pstoremanager.PeerstoreManager
// Create two hosts
h1, err := New(WithFxOption(fx.Populate(&psm1)))
require.NoError(t, err)
defer h1.Close()
require.NotNil(t, psm1)

h2, err := New(WithFxOption(fx.Populate(&psm2)))
require.NoError(t, err)
require.NotNil(t, psm2)
defer h2.Close()

// Set stream handlers to establish protocols on each host
h1.SetStreamHandler("/test/protocol/1.0.0", func(s network.Stream) {
s.Close()
})
h2.SetStreamHandler("/test/protocol/2.0.0", func(s network.Stream) {
s.Close()
})

// Connect the two hosts
err = h1.Connect(ctx, peer.AddrInfo{ID: h2.ID(), Addrs: h2.Addrs()})
require.NoError(t, err)

// Disconnect the hosts
err = h1.Network().ClosePeer(h2.ID())
require.NoError(t, err)

time.Sleep(100 * time.Millisecond)

// Check that h1 has h2's protocol info in its peerstore
h2Protocols, err := h1.Peerstore().GetProtocols(h2.ID())
require.NoError(t, err)
require.NotEmpty(t, h2Protocols, "h1 should have h2's protocol info after disconnect")

// Check that h2 has h1's protocol info in its peerstore
h1Protocols, err := h2.Peerstore().GetProtocols(h1.ID())
require.NoError(t, err)
require.NotEmpty(t, h1Protocols, "h2 should have h1's protocol info after disconnect")
}
13 changes: 11 additions & 2 deletions options.go
Original file line number Diff line number Diff line change
Expand Up @@ -418,13 +418,22 @@ func ResourceManager(rcmgr network.ResourceManager) Option {
// NATPortMap configures libp2p to use the default NATManager. The default
// NATManager will attempt to open a port in your network's firewall using UPnP.
func NATPortMap() Option {
return NATManager(bhost.NewNATManager)
return func(cfg *Config) error {
if cfg.NATManager != nil {
return fmt.Errorf("cannot enable both NATManager and NATPortMap")
}
cfg.EnableNATPortMap = true
return nil
}
}

// NATManager will configure libp2p to use the requested NATManager. This
// function should be passed a NATManager *constructor* that takes a libp2p Network.
func NATManager(nm config.NATManagerC) Option {
func NATManager(nm bhost.NATManager) Option {
return func(cfg *Config) error {
if cfg.EnableNATPortMap {
return fmt.Errorf("cannot enable both NATManager and NATPortMap")
}
if cfg.NATManager != nil {
return fmt.Errorf("cannot specify multiple NATManagers")
}
Expand Down
16 changes: 10 additions & 6 deletions p2p/host/basic/addrs_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,16 @@ type ObservedAddrsManager interface {
AddrsFor(local ma.Multiaddr) []ma.Multiaddr
}

// NATManager is a simple interface to manage NAT devices.
// It listens Listen and ListenClose notifications from the network.Network,
// and tries to obtain port mappings for those.
type NATManager interface {
GetMapping(ma.Multiaddr) ma.Multiaddr
HasDiscoveredNAT() bool
Start()
io.Closer
}

type hostAddrs struct {
addrs []ma.Multiaddr
localAddrs []ma.Multiaddr
Expand Down Expand Up @@ -156,12 +166,6 @@ func (a *addrsManager) Start() error {

func (a *addrsManager) Close() {
a.ctxCancel()
if a.natManager != nil {
err := a.natManager.Close()
if err != nil {
log.Warn("error closing natmgr", "err", err)
}
}
if a.addrsReachabilityTracker != nil {
err := a.addrsReachabilityTracker.Close()
if err != nil {
Expand Down
2 changes: 2 additions & 0 deletions p2p/host/basic/addrs_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ func (*mockNatManager) HasDiscoveredNAT() bool {
return true
}

func (*mockNatManager) Start() {}

var _ NATManager = &mockNatManager{}

type mockObservedAddrs struct {
Expand Down
Loading
Loading