Skip to content

Commit

Permalink
Merge branch 'master' into feat/memory-transport
Browse files Browse the repository at this point in the history
  • Loading branch information
pyropy authored Nov 21, 2024
2 parents e3203f2 + 288868c commit 4164b48
Show file tree
Hide file tree
Showing 60 changed files with 2,100 additions and 233 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/release-check.yml
Original file line number Diff line number Diff line change
Expand Up @@ -16,4 +16,4 @@ concurrency:

jobs:
release-check:
uses: ipdxco/unified-github-workflows/.github/workflows/release-check.yml@v1.0
uses: marcopolo/unified-github-workflows/.github/workflows/release-check.yml@e66cb9667a2e1148efda4591e29c56258eaf385b
10 changes: 10 additions & 0 deletions FUNDING.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
{
"opRetro": {
"projectId": "0xc71faa1bcb4ceb785816c3f22823377e9e5e7c48649badd9f0a0ce491f20d4b3"
},
"drips": {
"filecoin": {
"ownedBy": "0x53DCAf729e11022D5b8949946f6601211C662B38"
}
}
}
17 changes: 17 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,9 @@ import (
circuitv2 "github.com/libp2p/go-libp2p/p2p/protocol/circuitv2/client"
relayv2 "github.com/libp2p/go-libp2p/p2p/protocol/circuitv2/relay"
"github.com/libp2p/go-libp2p/p2p/protocol/holepunch"
"github.com/libp2p/go-libp2p/p2p/protocol/identify"
"github.com/libp2p/go-libp2p/p2p/transport/quicreuse"
"github.com/libp2p/go-libp2p/p2p/transport/tcpreuse"
libp2pwebrtc "github.com/libp2p/go-libp2p/p2p/transport/webrtc"
"github.com/prometheus/client_golang/prometheus"

Expand Down Expand Up @@ -142,6 +144,10 @@ type Config struct {
CustomUDPBlackHoleSuccessCounter bool
IPv6BlackHoleSuccessCounter *swarm.BlackHoleSuccessCounter
CustomIPv6BlackHoleSuccessCounter bool

UserFxOptions []fx.Option

ShareTCPListener bool
}

func (cfg *Config) makeSwarm(eventBus event.Bus, enableMetrics bool) (*swarm.Swarm, error) {
Expand Down Expand Up @@ -286,6 +292,12 @@ func (cfg *Config) addTransports() ([]fx.Option, error) {
fx.Provide(func() connmgr.ConnectionGater { return cfg.ConnectionGater }),
fx.Provide(func() pnet.PSK { return cfg.PSK }),
fx.Provide(func() network.ResourceManager { return cfg.ResourceManager }),
fx.Provide(func(gater connmgr.ConnectionGater, rcmgr network.ResourceManager) *tcpreuse.ConnMgr {
if !cfg.ShareTCPListener {
return nil
}
return tcpreuse.NewConnMgr(tcpreuse.EnvReuseportVal, gater, rcmgr)
}),
fx.Provide(func(cm *quicreuse.ConnManager, sw *swarm.Swarm) libp2pwebrtc.ListenUDPFn {
hasQuicAddrPortFor := func(network string, laddr *net.UDPAddr) bool {
quicAddrPorts := map[string]struct{}{}
Expand Down Expand Up @@ -482,6 +494,9 @@ func (cfg *Config) NewNode() (host.Host, error) {
return sw, nil
}),
fx.Provide(cfg.newBasicHost),
fx.Provide(func(bh *bhost.BasicHost) identify.IDService {
return bh.IDService()
}),
fx.Provide(func(bh *bhost.BasicHost) host.Host {
return bh
}),
Expand Down Expand Up @@ -536,6 +551,8 @@ func (cfg *Config) NewNode() (host.Host, error) {
fxopts = append(fxopts, fx.Invoke(func(bho *routed.RoutedHost) { rh = bho }))
}

fxopts = append(fxopts, cfg.UserFxOptions...)

app := fx.New(fxopts...)
if err := app.Start(context.Background()); err != nil {
return nil, err
Expand Down
23 changes: 17 additions & 6 deletions core/routing/routing.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,24 +18,35 @@ var ErrNotFound = errors.New("routing: not found")
// type/operation.
var ErrNotSupported = errors.New("routing: operation or key not supported")

// ContentRouting is a value provider layer of indirection. It is used to find
// information about who has what content.
//
// Content is identified by CID (content identifier), which encodes a hash
// of the identified content in a future-proof manner.
type ContentRouting interface {
// ContentProviding is able to announce where to find content on the Routing
// system.
type ContentProviding interface {
// Provide adds the given cid to the content routing system. If 'true' is
// passed, it also announces it, otherwise it is just kept in the local
// accounting of which objects are being provided.
Provide(context.Context, cid.Cid, bool) error
}

// ContentDiscovery is able to retrieve providers for a given CID using
// the Routing system.
type ContentDiscovery interface {
// Search for peers who are able to provide a given key
//
// When count is 0, this method will return an unbounded number of
// results.
FindProvidersAsync(context.Context, cid.Cid, int) <-chan peer.AddrInfo
}

// ContentRouting is a value provider layer of indirection. It is used to find
// information about who has what content.
//
// Content is identified by CID (content identifier), which encodes a hash
// of the identified content in a future-proof manner.
type ContentRouting interface {
ContentProviding
ContentDiscovery
}

// PeerRouting is a way to find address information about certain peers.
// This can be implemented by a simple lookup table, a tracking server,
// or even a DHT.
Expand Down
2 changes: 1 addition & 1 deletion dashboards/prometheus.yml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ alerting:
alertmanagers:
- scheme: http
timeout: 10s
api_version: v1
api_version: v2
static_configs:
- targets: []
scrape_configs:
Expand Down
5 changes: 0 additions & 5 deletions funding.json

This file was deleted.

60 changes: 60 additions & 0 deletions fx_options_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
package libp2p

import (
"testing"

"github.com/libp2p/go-libp2p/core/event"
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/p2p/protocol/identify"
"github.com/stretchr/testify/require"
"go.uber.org/fx"
)

func TestGetPeerID(t *testing.T) {
var id peer.ID
host, err := New(
WithFxOption(fx.Populate(&id)),
)
require.NoError(t, err)
defer host.Close()

require.Equal(t, host.ID(), id)

}

func TestGetEventBus(t *testing.T) {
var eb event.Bus
host, err := New(
NoTransports,
WithFxOption(fx.Populate(&eb)),
)
require.NoError(t, err)
defer host.Close()

require.NotNil(t, eb)
}

func TestGetHost(t *testing.T) {
var h host.Host
host, err := New(
NoTransports,
WithFxOption(fx.Populate(&h)),
)
require.NoError(t, err)
defer host.Close()

require.NotNil(t, h)
}

func TestGetIDService(t *testing.T) {
var id identify.IDService
host, err := New(
NoTransports,
WithFxOption(fx.Populate(&id)),
)
require.NoError(t, err)
defer host.Close()

require.NotNil(t, id)
}
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ require (
github.com/multiformats/go-multibase v0.2.0
github.com/multiformats/go-multicodec v0.9.0
github.com/multiformats/go-multihash v0.2.3
github.com/multiformats/go-multistream v0.5.0
github.com/multiformats/go-multistream v0.6.0
github.com/multiformats/go-varint v0.0.7
github.com/pbnjay/memory v0.0.0-20210728143218-7b4eea64cf58
github.com/pion/datachannel v1.5.9
Expand All @@ -55,7 +55,7 @@ require (
github.com/pion/webrtc/v3 v3.3.4
github.com/prometheus/client_golang v1.20.5
github.com/prometheus/client_model v0.6.1
github.com/quic-go/quic-go v0.48.0
github.com/quic-go/quic-go v0.48.1
github.com/quic-go/webtransport-go v0.8.1-0.20241018022711-4ac2c9250e66
github.com/raulk/go-watchdog v1.3.0
github.com/stretchr/testify v1.9.0
Expand Down
8 changes: 4 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -246,8 +246,8 @@ github.com/multiformats/go-multicodec v0.9.0/go.mod h1:L3QTQvMIaVBkXOXXtVmYE+LI1
github.com/multiformats/go-multihash v0.0.8/go.mod h1:YSLudS+Pi8NHE7o6tb3D8vrpKa63epEDmG8nTduyAew=
github.com/multiformats/go-multihash v0.2.3 h1:7Lyc8XfX/IY2jWb/gI7JP+o7JEq9hOa7BFvVU9RSh+U=
github.com/multiformats/go-multihash v0.2.3/go.mod h1:dXgKXCXjBzdscBLk9JkjINiEsCKRVch90MdaGiKsvSM=
github.com/multiformats/go-multistream v0.5.0 h1:5htLSLl7lvJk3xx3qT/8Zm9J4K8vEOf/QGkvOGQAyiE=
github.com/multiformats/go-multistream v0.5.0/go.mod h1:n6tMZiwiP2wUsR8DgfDWw1dydlEqV3l6N3/GBsX6ILA=
github.com/multiformats/go-multistream v0.6.0 h1:ZaHKbsL404720283o4c/IHQXiS6gb8qAN5EIJ4PN5EA=
github.com/multiformats/go-multistream v0.6.0/go.mod h1:MOyoG5otO24cHIg8kf9QW2/NozURlkP/rvi2FQJyCPg=
github.com/multiformats/go-varint v0.0.7 h1:sWSGR+f/eu5ABZA2ZpYKBILXTTs9JWpdEM/nEGOHFS8=
github.com/multiformats/go-varint v0.0.7/go.mod h1:r8PUYw/fD/SjBCiKOoDlGF6QawOELpZAu9eioSos/OU=
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 h1:C3w9PqII01/Oq1c1nUAm88MOHcQC9l5mIlSMApZMrHA=
Expand Down Expand Up @@ -333,8 +333,8 @@ github.com/prometheus/procfs v0.15.1 h1:YagwOFzUgYfKKHX6Dr+sHT7km/hxC76UB0leargg
github.com/prometheus/procfs v0.15.1/go.mod h1:fB45yRUv8NstnjriLhBQLuOUt+WW4BsoGhij/e3PBqk=
github.com/quic-go/qpack v0.5.1 h1:giqksBPnT/HDtZ6VhtFKgoLOWmlyo9Ei6u9PqzIMbhI=
github.com/quic-go/qpack v0.5.1/go.mod h1:+PC4XFrEskIVkcLzpEkbLqq1uCoxPhQuvK5rH1ZgaEg=
github.com/quic-go/quic-go v0.48.0 h1:2TCyvBrMu1Z25rvIAlnp2dPT4lgh/uTqLqiXVpp5AeU=
github.com/quic-go/quic-go v0.48.0/go.mod h1:yBgs3rWBOADpga7F+jJsb6Ybg1LSYiQvwWlLX+/6HMs=
github.com/quic-go/quic-go v0.48.1 h1:y/8xmfWI9qmGTc+lBr4jKRUWLGSlSigv847ULJ4hYXA=
github.com/quic-go/quic-go v0.48.1/go.mod h1:yBgs3rWBOADpga7F+jJsb6Ybg1LSYiQvwWlLX+/6HMs=
github.com/quic-go/webtransport-go v0.8.1-0.20241018022711-4ac2c9250e66 h1:4WFk6u3sOT6pLa1kQ50ZVdm8BQFgJNA117cepZxtLIg=
github.com/quic-go/webtransport-go v0.8.1-0.20241018022711-4ac2c9250e66/go.mod h1:Vp72IJajgeOL6ddqrAhmp7IM9zbTcgkQxD/YdxrVwMw=
github.com/raulk/go-watchdog v1.3.0 h1:oUmdlHxdkXRJlwfG0O9omj8ukerm8MEQavSiDTEtBsk=
Expand Down
26 changes: 25 additions & 1 deletion libp2p_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ func TestTransportConstructor(t *testing.T) {
_ connmgr.ConnectionGater,
upgrader transport.Upgrader,
) transport.Transport {
tpt, err := tcp.NewTCPTransport(upgrader, nil)
tpt, err := tcp.NewTCPTransport(upgrader, nil, nil)
require.NoError(t, err)
return tpt
}
Expand Down Expand Up @@ -751,3 +751,27 @@ func getTLSConf(t *testing.T, ip net.IP, start, end time.Time) *tls.Config {
}},
}
}

func TestSharedTCPAddr(t *testing.T) {
h, err := New(
ShareTCPListener(),
Transport(tcp.NewTCPTransport),
Transport(websocket.New),
ListenAddrStrings("/ip4/0.0.0.0/tcp/8888"),
ListenAddrStrings("/ip4/0.0.0.0/tcp/8888/ws"),
)
require.NoError(t, err)
sawTCP := false
sawWS := false
for _, addr := range h.Addrs() {
if strings.HasSuffix(addr.String(), "/tcp/8888") {
sawTCP = true
}
if strings.HasSuffix(addr.String(), "/tcp/8888/ws") {
sawWS = true
}
}
require.True(t, sawTCP)
require.True(t, sawWS)
h.Close()
}
21 changes: 21 additions & 0 deletions options.go
Original file line number Diff line number Diff line change
Expand Up @@ -634,3 +634,24 @@ func IPv6BlackHoleSuccessCounter(f *swarm.BlackHoleSuccessCounter) Option {
return nil
}
}

// WithFxOption adds a user provided fx.Option to the libp2p constructor.
// Experimental: This option is subject to change or removal.
func WithFxOption(opts ...fx.Option) Option {
return func(cfg *Config) error {
cfg.UserFxOptions = append(cfg.UserFxOptions, opts...)
return nil
}
}

// ShareTCPListener shares the same listen address between TCP and Websocket
// transports. This lets both transports use the same TCP port.
//
// Currently this behavior is Opt-in. In a future release this will be the
// default, and this option will be removed.
func ShareTCPListener() Option {
return func(cfg *Config) error {
cfg.ShareTCPListener = true
return nil
}
}
15 changes: 12 additions & 3 deletions p2p/host/basic/basic_host.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,9 +122,10 @@ type HostOpts struct {
// MultistreamMuxer is essential for the *BasicHost and will use a sensible default value if omitted.
MultistreamMuxer *msmux.MultistreamMuxer[protocol.ID]

// NegotiationTimeout determines the read and write timeouts on streams.
// If 0 or omitted, it will use DefaultNegotiationTimeout.
// If below 0, timeouts on streams will be deactivated.
// NegotiationTimeout determines the read and write timeouts when negotiating
// protocols for streams. If 0 or omitted, it will use
// DefaultNegotiationTimeout. If below 0, timeouts on streams will be
// deactivated.
NegotiationTimeout time.Duration

// AddrsFactory holds a function which can be used to override or filter the result of Addrs.
Expand Down Expand Up @@ -689,6 +690,14 @@ func (h *BasicHost) RemoveStreamHandler(pid protocol.ID) {
// to create one. If ProtocolID is "", writes no header.
// (Thread-safe)
func (h *BasicHost) NewStream(ctx context.Context, p peer.ID, pids ...protocol.ID) (str network.Stream, strErr error) {
if _, ok := ctx.Deadline(); !ok {
if h.negtimeout > 0 {
var cancel context.CancelFunc
ctx, cancel = context.WithTimeout(ctx, h.negtimeout)
defer cancel()
}
}

// If the caller wants to prevent the host from dialing, it should use the NoDial option.
if nodial, _ := network.GetNoDial(ctx); !nodial {
err := h.Connect(ctx, peer.AddrInfo{ID: p})
Expand Down
54 changes: 54 additions & 0 deletions p2p/host/basic/basic_host_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package basichost

import (
"context"
"encoding/binary"
"fmt"
"io"
"reflect"
Expand Down Expand Up @@ -941,3 +942,56 @@ func TestTrimHostAddrList(t *testing.T) {
})
}
}

func TestHostTimeoutNewStream(t *testing.T) {
h1, err := NewHost(swarmt.GenSwarm(t), nil)
require.NoError(t, err)
h1.Start()
defer h1.Close()

const proto = "/testing"
h2 := swarmt.GenSwarm(t)

h2.SetStreamHandler(func(s network.Stream) {
// First message is multistream header. Just echo it
msHeader := []byte("\x19/multistream/1.0.0\n")
_, err := s.Read(msHeader)
assert.NoError(t, err)
_, err = s.Write(msHeader)
assert.NoError(t, err)

buf := make([]byte, 1024)
n, err := s.Read(buf)
assert.NoError(t, err)

msgLen, varintN := binary.Uvarint(buf[:n])
buf = buf[varintN:]
proto := buf[:int(msgLen)]
if string(proto) == "/ipfs/id/1.0.0\n" {
// Signal we don't support identify
na := []byte("na\n")
n := binary.PutUvarint(buf, uint64(len(na)))
copy(buf[n:], na)

_, err = s.Write(buf[:int(n)+len(na)])
assert.NoError(t, err)
} else {
// Stall
time.Sleep(5 * time.Second)
}
t.Log("Resetting")
s.Reset()
})

err = h1.Connect(context.Background(), peer.AddrInfo{
ID: h2.LocalPeer(),
Addrs: h2.ListenAddresses(),
})
require.NoError(t, err)

// No context passed in, fallback to negtimeout
h1.negtimeout = time.Second
_, err = h1.NewStream(context.Background(), h2.LocalPeer(), proto)
require.Error(t, err)
require.ErrorContains(t, err, "context deadline exceeded")
}
Loading

0 comments on commit 4164b48

Please sign in to comment.