diff --git a/CHANGELOG.md b/CHANGELOG.md index 2cd5c66ae..fc1cfd48b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -14,6 +14,13 @@ The following emojis are used to highlight certain changes: ## [Unreleased] +- ✨ `httpnet`: Transparent HTTP-block retrieval support over Trustless Gateways [#747]((https://github.com/ipfs/boxo/pull/747): + - Complements Bitswap as a block-retrieval mechanism, implementing `bitswap/network`. + - Understands peers found in provider records with `/.../http` endpoints (trustless gateway). + - Treats them as "Bitswap" peers, except instead of using Bitswap it makes HTTP/2 requests to discover (`HEAD`) and retrieve (`GET`) individual blocks (`?format=raw`). + - A `bitswap/network` proxy implementation allows co-existance with standard `bitswap/network/bsnet`. + - `httpnet` is not enabled by default. Upstream implementations may use it by modifying how they create the Bitswap network and initialize bitswap. + ### Added ### Changed diff --git a/bitswap/benchmarks_test.go b/bitswap/benchmarks_test.go index d1930c900..53e75972b 100644 --- a/bitswap/benchmarks_test.go +++ b/bitswap/benchmarks_test.go @@ -13,7 +13,7 @@ import ( "time" "github.com/ipfs/boxo/bitswap" - bsnet "github.com/ipfs/boxo/bitswap/network" + bsnet "github.com/ipfs/boxo/bitswap/network/bsnet" testinstance "github.com/ipfs/boxo/bitswap/testinstance" tn "github.com/ipfs/boxo/bitswap/testnet" mockrouting "github.com/ipfs/boxo/routing/mock" diff --git a/bitswap/client/client.go b/bitswap/client/client.go index 960a2c6e6..14413d5fe 100644 --- a/bitswap/client/client.go +++ b/bitswap/client/client.go @@ -1,4 +1,4 @@ -// Package bitswap implements the IPFS exchange interface with the BitSwap +// Package client implements the IPFS exchange interface with the BitSwap // bilateral exchange protocol. package client @@ -202,7 +202,7 @@ func New(parent context.Context, network bsnet.BitSwapNetwork, providerFinder ro sim := bssim.New() bpm := bsbpm.New() - pm := bspm.New(ctx, peerQueueFactory, network.Self()) + pm := bspm.New(ctx, peerQueueFactory) if bs.providerFinder != nil && bs.defaultProviderQueryManager { // network can do dialing. @@ -241,7 +241,7 @@ func New(parent context.Context, network bsnet.BitSwapNetwork, providerFinder ro return bssession.New(sessctx, sessmgr, id, spm, sessionProvFinder, sim, pm, bpm, notif, provSearchDelay, rebroadcastDelay, self) } sessionPeerManagerFactory := func(ctx context.Context, id uint64) bssession.SessionPeerManager { - return bsspm.New(id, network.ConnectionManager()) + return bsspm.New(id, network) } notif := notifications.New() sm = bssm.New(ctx, sessionFactory, sim, sessionPeerManagerFactory, bpm, pm, notif, network.Self()) diff --git a/bitswap/client/internal/peermanager/peermanager.go b/bitswap/client/internal/peermanager/peermanager.go index f070462d2..e8519a275 100644 --- a/bitswap/client/internal/peermanager/peermanager.go +++ b/bitswap/client/internal/peermanager/peermanager.go @@ -44,12 +44,10 @@ type PeerManager struct { psLk sync.RWMutex sessions map[uint64]Session peerSessions map[peer.ID]map[uint64]struct{} - - self peer.ID } // New creates a new PeerManager, given a context and a peerQueueFactory. -func New(ctx context.Context, createPeerQueue PeerQueueFactory, self peer.ID) *PeerManager { +func New(ctx context.Context, createPeerQueue PeerQueueFactory) *PeerManager { wantGauge := metrics.NewCtx(ctx, "wantlist_total", "Number of items in wantlist.").Gauge() wantBlockGauge := metrics.NewCtx(ctx, "want_blocks_total", "Number of want-blocks in wantlist.").Gauge() return &PeerManager{ @@ -57,7 +55,6 @@ func New(ctx context.Context, createPeerQueue PeerQueueFactory, self peer.ID) *P pwm: newPeerWantManager(wantGauge, wantBlockGauge), createPeerQueue: createPeerQueue, ctx: ctx, - self: self, sessions: make(map[uint64]Session), peerSessions: make(map[peer.ID]map[uint64]struct{}), diff --git a/bitswap/client/internal/peermanager/peermanager_test.go b/bitswap/client/internal/peermanager/peermanager_test.go index b778c46e3..b45cd4c33 100644 --- a/bitswap/client/internal/peermanager/peermanager_test.go +++ b/bitswap/client/internal/peermanager/peermanager_test.go @@ -85,8 +85,8 @@ func TestAddingAndRemovingPeers(t *testing.T) { peerQueueFactory := makePeerQueueFactory(msgs) tp := random.Peers(6) - self, peer1, peer2, peer3, peer4, peer5 := tp[0], tp[1], tp[2], tp[3], tp[4], tp[5] - peerManager := New(ctx, peerQueueFactory, self) + peer1, peer2, peer3, peer4, peer5 := tp[0], tp[1], tp[2], tp[3], tp[4] + peerManager := New(ctx, peerQueueFactory) peerManager.Connected(peer1) peerManager.Connected(peer2) @@ -128,8 +128,8 @@ func TestBroadcastOnConnect(t *testing.T) { msgs := make(chan msg, 16) peerQueueFactory := makePeerQueueFactory(msgs) tp := random.Peers(2) - self, peer1 := tp[0], tp[1] - peerManager := New(ctx, peerQueueFactory, self) + peer1 := tp[0] + peerManager := New(ctx, peerQueueFactory) cids := random.Cids(2) peerManager.BroadcastWantHaves(ctx, cids) @@ -149,8 +149,8 @@ func TestBroadcastWantHaves(t *testing.T) { msgs := make(chan msg, 16) peerQueueFactory := makePeerQueueFactory(msgs) tp := random.Peers(3) - self, peer1, peer2 := tp[0], tp[1], tp[2] - peerManager := New(ctx, peerQueueFactory, self) + peer1, peer2 := tp[0], tp[1] + peerManager := New(ctx, peerQueueFactory) cids := random.Cids(3) @@ -190,8 +190,8 @@ func TestSendWants(t *testing.T) { msgs := make(chan msg, 16) peerQueueFactory := makePeerQueueFactory(msgs) tp := random.Peers(2) - self, peer1 := tp[0], tp[1] - peerManager := New(ctx, peerQueueFactory, self) + peer1 := tp[0] + peerManager := New(ctx, peerQueueFactory) cids := random.Cids(4) peerManager.Connected(peer1) @@ -224,8 +224,8 @@ func TestSendCancels(t *testing.T) { msgs := make(chan msg, 16) peerQueueFactory := makePeerQueueFactory(msgs) tp := random.Peers(3) - self, peer1, peer2 := tp[0], tp[1], tp[2] - peerManager := New(ctx, peerQueueFactory, self) + peer1, peer2 := tp[0], tp[1] + peerManager := New(ctx, peerQueueFactory) cids := random.Cids(4) // Connect to peer1 and peer2 @@ -285,8 +285,8 @@ func TestSessionRegistration(t *testing.T) { peerQueueFactory := makePeerQueueFactory(msgs) tp := random.Peers(3) - self, p1, p2 := tp[0], tp[1], tp[2] - peerManager := New(ctx, peerQueueFactory, self) + p1, p2 := tp[0], tp[1] + peerManager := New(ctx, peerQueueFactory) id := uint64(1) s := newSess(id) @@ -344,9 +344,8 @@ func BenchmarkPeerManager(b *testing.B) { return &benchPeerQueue{} } - self := random.Peers(1)[0] peers := random.Peers(500) - peerManager := New(ctx, peerQueueFactory, self) + peerManager := New(ctx, peerQueueFactory) // Create a bunch of connections connected := 0 diff --git a/bitswap/message/message.go b/bitswap/message/message.go index a550ecf05..d0548ed4e 100644 --- a/bitswap/message/message.go +++ b/bitswap/message/message.go @@ -416,26 +416,30 @@ func BlockPresenceSize(c cid.Cid) int { } // FromNet generates a new BitswapMessage from incoming data on an io.Reader. -func FromNet(r io.Reader) (BitSwapMessage, error) { +func FromNet(r io.Reader) (BitSwapMessage, int, error) { reader := msgio.NewVarintReaderSize(r, network.MessageSizeMax) return FromMsgReader(reader) } // FromPBReader generates a new Bitswap message from a protobuf reader. -func FromMsgReader(r msgio.Reader) (BitSwapMessage, error) { +func FromMsgReader(r msgio.Reader) (BitSwapMessage, int, error) { msg, err := r.ReadMsg() if err != nil { - return nil, err + return nil, 0, err } pb := new(pb.Message) err = proto.Unmarshal(msg, pb) r.ReleaseMsg(msg) if err != nil { - return nil, err + return nil, 0, err } - return newMessageFromProto(pb) + m, err := newMessageFromProto(pb) + if err != nil { + return nil, 0, err + } + return m, len(msg), nil } func (m *impl) ToProtoV0() *pb.Message { diff --git a/bitswap/message/message_test.go b/bitswap/message/message_test.go index 15a1eda78..82cecf1ee 100644 --- a/bitswap/message/message_test.go +++ b/bitswap/message/message_test.go @@ -116,7 +116,7 @@ func TestToNetFromNetPreservesWantList(t *testing.T) { t.Fatal(err) } - copied, err := FromNet(buf) + copied, _, err := FromNet(buf) if err != nil { t.Fatal(err) } @@ -149,7 +149,7 @@ func TestToAndFromNetMessage(t *testing.T) { t.Fatal(err) } - m2, err := FromNet(buf) + m2, _, err := FromNet(buf) if err != nil { t.Fatal(err) } diff --git a/bitswap/network/bsnet/bsnet.go b/bitswap/network/bsnet/bsnet.go new file mode 100644 index 000000000..546f2c0b7 --- /dev/null +++ b/bitswap/network/bsnet/bsnet.go @@ -0,0 +1,14 @@ +package bsnet + +import "github.com/ipfs/boxo/bitswap/network/bsnet/internal" + +var ( + // ProtocolBitswapNoVers is equivalent to the legacy bitswap protocol + ProtocolBitswapNoVers = internal.ProtocolBitswapNoVers + // ProtocolBitswapOneZero is the prefix for the legacy bitswap protocol + ProtocolBitswapOneZero = internal.ProtocolBitswapOneZero + // ProtocolBitswapOneOne is the prefix for version 1.1.0 + ProtocolBitswapOneOne = internal.ProtocolBitswapOneOne + // ProtocolBitswap is the current version of the bitswap protocol: 1.2.0 + ProtocolBitswap = internal.ProtocolBitswap +) diff --git a/bitswap/network/internal/default.go b/bitswap/network/bsnet/internal/default.go similarity index 100% rename from bitswap/network/internal/default.go rename to bitswap/network/bsnet/internal/default.go diff --git a/bitswap/network/ipfs_impl.go b/bitswap/network/bsnet/ipfs_impl.go similarity index 86% rename from bitswap/network/ipfs_impl.go rename to bitswap/network/bsnet/ipfs_impl.go index 952ee54aa..427f3ff0a 100644 --- a/bitswap/network/ipfs_impl.go +++ b/bitswap/network/bsnet/ipfs_impl.go @@ -1,4 +1,4 @@ -package network +package bsnet import ( "context" @@ -9,9 +9,10 @@ import ( "time" bsmsg "github.com/ipfs/boxo/bitswap/message" - "github.com/ipfs/boxo/bitswap/network/internal" + iface "github.com/ipfs/boxo/bitswap/network" + "github.com/ipfs/boxo/bitswap/network/bsnet/internal" + logging "github.com/ipfs/go-log/v2" - "github.com/libp2p/go-libp2p/core/connmgr" "github.com/libp2p/go-libp2p/core/host" "github.com/libp2p/go-libp2p/core/network" "github.com/libp2p/go-libp2p/core/peer" @@ -22,7 +23,7 @@ import ( "github.com/multiformats/go-multistream" ) -var log = logging.Logger("bitswap/network") +var log = logging.Logger("bitswap/bsnet") var ( maxSendTimeout = 2 * time.Minute @@ -32,7 +33,7 @@ var ( ) // NewFromIpfsHost returns a BitSwapNetwork supported by underlying IPFS host. -func NewFromIpfsHost(host host.Host, opts ...NetOpt) BitSwapNetwork { +func NewFromIpfsHost(host host.Host, opts ...NetOpt) iface.BitSwapNetwork { s := processSettings(opts...) bitswapNetwork := impl{ @@ -44,6 +45,8 @@ func NewFromIpfsHost(host host.Host, opts ...NetOpt) BitSwapNetwork { protocolBitswap: s.ProtocolPrefix + ProtocolBitswap, supportedProtocols: s.SupportedProtocols, + + metrics: newMetrics(), } return &bitswapNetwork @@ -65,10 +68,10 @@ func processSettings(opts ...NetOpt) Settings { type impl struct { // NOTE: Stats must be at the top of the heap allocation to ensure 64bit // alignment. - stats Stats + stats iface.Stats host host.Host - connectEvtMgr *connectEventManager + connectEvtMgr *iface.ConnectEventManager protocolBitswapNoVers protocol.ID protocolBitswapOneZero protocol.ID @@ -78,7 +81,9 @@ type impl struct { supportedProtocols []protocol.ID // inbound messages from the network are forwarded to the receiver - receivers []Receiver + receivers []iface.Receiver + + metrics *metrics } // interfaceWrapper is concrete type that wraps an interface. Necessary because @@ -108,7 +113,7 @@ type streamMessageSender struct { to peer.ID stream atomicInterface[network.Stream] bsnet *impl - opts *MessageSenderOpts + opts *iface.MessageSenderOpts } type HasContext interface { @@ -154,17 +159,6 @@ func (s *streamMessageSender) Reset() error { return nil } -// Close the stream -func (s *streamMessageSender) Close() error { - stream := s.stream.Load() - if stream != nil { - err := stream.Close() - s.stream.Store(nil) - return err - } - return nil -} - // Indicates whether the peer supports HAVE / DONT_HAVE messages func (s *streamMessageSender) SupportsHave() bool { stream := s.stream.Load() @@ -176,6 +170,15 @@ func (s *streamMessageSender) SupportsHave() bool { // Send a message to the peer, attempting multiple times func (s *streamMessageSender) SendMsg(ctx context.Context, msg bsmsg.BitSwapMessage) error { + if n := len(msg.Wantlist()); n > 0 { + s.bsnet.metrics.WantlistsTotal.Inc() + s.bsnet.metrics.WantlistsItemsTotal.Add(float64(n)) + now := time.Now() + defer func() { + s.bsnet.metrics.WantlistsSeconds.Observe(float64(time.Since(now)) / float64(time.Second)) + }() + } + return s.multiAttempt(ctx, func() error { return s.send(ctx, msg) }) @@ -316,7 +319,7 @@ func (bsnet *impl) msgToStream(ctx context.Context, s network.Stream, msg bsmsg. return nil } -func (bsnet *impl) NewMessageSender(ctx context.Context, p peer.ID, opts *MessageSenderOpts) (MessageSender, error) { +func (bsnet *impl) NewMessageSender(ctx context.Context, p peer.ID, opts *iface.MessageSenderOpts) (iface.MessageSender, error) { opts = setDefaultOpts(opts) sender := &streamMessageSender{ @@ -336,7 +339,7 @@ func (bsnet *impl) NewMessageSender(ctx context.Context, p peer.ID, opts *Messag return sender, nil } -func setDefaultOpts(opts *MessageSenderOpts) *MessageSenderOpts { +func setDefaultOpts(opts *iface.MessageSenderOpts) *iface.MessageSenderOpts { copy := *opts if opts.MaxRetries == 0 { copy.MaxRetries = 3 @@ -384,14 +387,14 @@ func (bsnet *impl) newStreamToPeer(ctx context.Context, p peer.ID) (network.Stre return bsnet.host.NewStream(ctx, p, bsnet.supportedProtocols...) } -func (bsnet *impl) Start(r ...Receiver) { +func (bsnet *impl) Start(r ...iface.Receiver) { bsnet.receivers = r { - connectionListeners := make([]ConnectionListener, len(r)) + connectionListeners := make([]iface.ConnectionListener, len(r)) for i, v := range r { connectionListeners[i] = v } - bsnet.connectEvtMgr = newConnectEventManager(connectionListeners...) + bsnet.connectEvtMgr = iface.NewConnectEventManager(connectionListeners...) } for _, proto := range bsnet.supportedProtocols { bsnet.host.SetStreamHandler(proto, bsnet.handleNewStream) @@ -427,7 +430,7 @@ func (bsnet *impl) handleNewStream(s network.Stream) { reader := msgio.NewVarintReaderSize(s, network.MessageSizeMax) for { - received, err := bsmsg.FromMsgReader(reader) + received, size, err := bsmsg.FromMsgReader(reader) if err != nil { if err != io.EOF { _ = s.Reset() @@ -439,6 +442,7 @@ func (bsnet *impl) handleNewStream(s network.Stream) { return } + bsnet.metrics.ResponseSizes.Observe(float64(size)) p := s.Conn().RemotePeer() ctx := context.Background() log.Debugf("bitswap net handleNewStream from %s", s.Conn().RemotePeer()) @@ -450,12 +454,36 @@ func (bsnet *impl) handleNewStream(s network.Stream) { } } -func (bsnet *impl) ConnectionManager() connmgr.ConnManager { - return bsnet.host.ConnManager() +func (bsnet *impl) TagPeer(p peer.ID, tag string, w int) { + if bsnet.host == nil { + return + } + bsnet.host.ConnManager().TagPeer(p, tag, w) +} + +func (bsnet *impl) UntagPeer(p peer.ID, tag string) { + if bsnet.host == nil { + return + } + bsnet.host.ConnManager().UntagPeer(p, tag) +} + +func (bsnet *impl) Protect(p peer.ID, tag string) { + if bsnet.host == nil { + return + } + bsnet.host.ConnManager().Protect(p, tag) +} + +func (bsnet *impl) Unprotect(p peer.ID, tag string) bool { + if bsnet.host == nil { + return false + } + return bsnet.host.ConnManager().Unprotect(p, tag) } -func (bsnet *impl) Stats() Stats { - return Stats{ +func (bsnet *impl) Stats() iface.Stats { + return iface.Stats{ MessagesRecvd: atomic.LoadUint64(&bsnet.stats.MessagesRecvd), MessagesSent: atomic.LoadUint64(&bsnet.stats.MessagesSent), } diff --git a/bitswap/network/ipfs_impl_test.go b/bitswap/network/bsnet/ipfs_impl_test.go similarity index 93% rename from bitswap/network/ipfs_impl_test.go rename to bitswap/network/bsnet/ipfs_impl_test.go index bfba5709d..670cea6f4 100644 --- a/bitswap/network/ipfs_impl_test.go +++ b/bitswap/network/bsnet/ipfs_impl_test.go @@ -1,4 +1,4 @@ -package network_test +package bsnet_test import ( "context" @@ -10,13 +10,14 @@ import ( bsmsg "github.com/ipfs/boxo/bitswap/message" pb "github.com/ipfs/boxo/bitswap/message/pb" - bsnet "github.com/ipfs/boxo/bitswap/network" - "github.com/ipfs/boxo/bitswap/network/internal" + network "github.com/ipfs/boxo/bitswap/network" + bsnet "github.com/ipfs/boxo/bitswap/network/bsnet" + "github.com/ipfs/boxo/bitswap/network/bsnet/internal" tn "github.com/ipfs/boxo/bitswap/testnet" "github.com/ipfs/go-test/random" tnet "github.com/libp2p/go-libp2p-testing/net" "github.com/libp2p/go-libp2p/core/host" - "github.com/libp2p/go-libp2p/core/network" + p2pnet "github.com/libp2p/go-libp2p/core/network" "github.com/libp2p/go-libp2p/core/peer" "github.com/libp2p/go-libp2p/core/protocol" mocknet "github.com/libp2p/go-libp2p/p2p/net/mock" @@ -30,7 +31,7 @@ type receiver struct { connectionEvent chan bool lastMessage bsmsg.BitSwapMessage lastSender peer.ID - listener network.Notifiee + listener p2pnet.Notifiee } func newReceiver() *receiver { @@ -71,7 +72,7 @@ func (r *receiver) PeerDisconnected(p peer.ID) { var errMockNetErr = errors.New("network err") type ErrStream struct { - network.Stream + p2pnet.Stream lk sync.Mutex err error timingOut bool @@ -107,6 +108,14 @@ func (es *ErrStream) Close() error { return es.Stream.Close() } +func (es *ErrStream) Reset() error { + es.lk.Lock() + es.closed = true + es.lk.Unlock() + + return es.Stream.Reset() +} + func (eh *ErrHost) Connect(ctx context.Context, pi peer.AddrInfo) error { eh.lk.Lock() defer eh.lk.Unlock() @@ -120,7 +129,7 @@ func (eh *ErrHost) Connect(ctx context.Context, pi peer.AddrInfo) error { return eh.Host.Connect(ctx, pi) } -func (eh *ErrHost) NewStream(ctx context.Context, p peer.ID, pids ...protocol.ID) (network.Stream, error) { +func (eh *ErrHost) NewStream(ctx context.Context, p peer.ID, pids ...protocol.ID) (p2pnet.Stream, error) { eh.lk.Lock() defer eh.lk.Unlock() @@ -268,7 +277,7 @@ func TestMessageSendAndReceive(t *testing.T) { } } -func prepareNetwork(t *testing.T, ctx context.Context, p1 tnet.Identity, r1 *receiver, p2 tnet.Identity, r2 *receiver) (*ErrHost, bsnet.BitSwapNetwork, *ErrHost, bsnet.BitSwapNetwork, bsmsg.BitSwapMessage) { +func prepareNetwork(t *testing.T, ctx context.Context, p1 tnet.Identity, r1 *receiver, p2 tnet.Identity, r2 *receiver) (*ErrHost, network.BitSwapNetwork, *ErrHost, network.BitSwapNetwork, bsmsg.BitSwapMessage) { // create network mn := mocknet.New() defer mn.Close() @@ -337,7 +346,7 @@ func TestMessageResendAfterError(t *testing.T) { eh, bsnet1, _, _, msg := prepareNetwork(t, ctx, p1, r1, p2, r2) testSendErrorBackoff := 100 * time.Millisecond - ms, err := bsnet1.NewMessageSender(ctx, p2.ID(), &bsnet.MessageSenderOpts{ + ms, err := bsnet1.NewMessageSender(ctx, p2.ID(), &network.MessageSenderOpts{ MaxRetries: 3, SendTimeout: 100 * time.Millisecond, SendErrorBackoff: testSendErrorBackoff, @@ -345,7 +354,7 @@ func TestMessageResendAfterError(t *testing.T) { if err != nil { t.Fatal(err) } - defer ms.Close() + defer ms.Reset() // Return an error from the networking layer the next time we try to send // a message @@ -382,7 +391,7 @@ func TestMessageSendTimeout(t *testing.T) { eh, bsnet1, _, _, msg := prepareNetwork(t, ctx, p1, r1, p2, r2) - ms, err := bsnet1.NewMessageSender(ctx, p2.ID(), &bsnet.MessageSenderOpts{ + ms, err := bsnet1.NewMessageSender(ctx, p2.ID(), &network.MessageSenderOpts{ MaxRetries: 3, SendTimeout: 100 * time.Millisecond, SendErrorBackoff: 100 * time.Millisecond, @@ -390,7 +399,7 @@ func TestMessageSendTimeout(t *testing.T) { if err != nil { t.Fatal(err) } - defer ms.Close() + defer ms.Reset() // Return a DeadlineExceeded error from the networking layer the next time we try to // send a message @@ -424,13 +433,13 @@ func TestMessageSendNotSupportedResponse(t *testing.T) { eh, bsnet1, _, _, _ := prepareNetwork(t, ctx, p1, r1, p2, r2) eh.setError(multistream.ErrNotSupported[protocol.ID]{}) - ms, err := bsnet1.NewMessageSender(ctx, p2.ID(), &bsnet.MessageSenderOpts{ + ms, err := bsnet1.NewMessageSender(ctx, p2.ID(), &network.MessageSenderOpts{ MaxRetries: 3, SendTimeout: 100 * time.Millisecond, SendErrorBackoff: 100 * time.Millisecond, }) if err == nil { - ms.Close() + ms.Reset() t.Fatal("Expected ErrNotSupported") } @@ -482,11 +491,11 @@ func TestSupportsHave(t *testing.T) { t.Fatal(err) } - senderCurrent, err := bsnet1.NewMessageSender(ctx, p2.ID(), &bsnet.MessageSenderOpts{}) + senderCurrent, err := bsnet1.NewMessageSender(ctx, p2.ID(), &network.MessageSenderOpts{}) if err != nil { t.Fatal(err) } - defer senderCurrent.Close() + defer senderCurrent.Reset() if senderCurrent.SupportsHave() != tc.expSupportsHave { t.Fatal("Expected sender HAVE message support", tc.proto, tc.expSupportsHave) @@ -532,11 +541,11 @@ func testNetworkCounters(t *testing.T, n1 int, n2 int) { } if n2 > 0 { - ms, err := bsnet1.NewMessageSender(ctx, p2.ID(), &bsnet.MessageSenderOpts{}) + ms, err := bsnet1.NewMessageSender(ctx, p2.ID(), &network.MessageSenderOpts{}) if err != nil { t.Fatal(err) } - defer ms.Close() + defer ms.Reset() for n := 0; n < n2; n++ { ctx, cancel := context.WithTimeout(ctx, time.Second) err = ms.SendMsg(ctx, msg) @@ -561,7 +570,7 @@ func testNetworkCounters(t *testing.T, n1 int, n2 int) { } cancel() } - ms.Close() + ms.Reset() } // Wait until all streams are closed and MessagesRecvd counters diff --git a/bitswap/network/ipfs_impl_timeout_test.go b/bitswap/network/bsnet/ipfs_impl_timeout_test.go similarity index 97% rename from bitswap/network/ipfs_impl_timeout_test.go rename to bitswap/network/bsnet/ipfs_impl_timeout_test.go index fdbe8e950..994804235 100644 --- a/bitswap/network/ipfs_impl_timeout_test.go +++ b/bitswap/network/bsnet/ipfs_impl_timeout_test.go @@ -1,4 +1,4 @@ -package network +package bsnet import ( "testing" diff --git a/bitswap/network/bsnet/metrics.go b/bitswap/network/bsnet/metrics.go new file mode 100644 index 000000000..442bc2b29 --- /dev/null +++ b/bitswap/network/bsnet/metrics.go @@ -0,0 +1,45 @@ +package bsnet + +import ( + "context" + + imetrics "github.com/ipfs/go-metrics-interface" +) + +var durationHistogramBuckets = []float64{0.05, 0.1, 0.25, 0.5, 1, 2, 5, 10, 30, 60, 120, 240, 480, 960, 1920} + +var blockSizesHistogramBuckets = []float64{1, 128 << 10, 256 << 10, 512 << 10, 1024 << 10, 2048 << 10, 4092 << 10} + +func responseSizes(ctx context.Context) imetrics.Histogram { + return imetrics.NewCtx(ctx, "response_bytes", "Histogram of bitswap response sizes").Histogram(blockSizesHistogramBuckets) +} + +func wantlistsTotal(ctx context.Context) imetrics.Counter { + return imetrics.NewCtx(ctx, "wantlists_total", "Total number of wantlists sent").Counter() +} + +func wantlistsItemsTotal(ctx context.Context) imetrics.Counter { + return imetrics.NewCtx(ctx, "wantlists_items_total", "Total number of elements in sent wantlists").Counter() +} + +func wantlistsSeconds(ctx context.Context) imetrics.Histogram { + return imetrics.NewCtx(ctx, "wantlists_seconds", "Number of seconds spent sending wantlists").Histogram(durationHistogramBuckets) +} + +type metrics struct { + WantlistsTotal imetrics.Counter + WantlistsItemsTotal imetrics.Counter + WantlistsSeconds imetrics.Histogram + ResponseSizes imetrics.Histogram +} + +func newMetrics() *metrics { + ctx := imetrics.CtxScope(context.Background(), "exchange_bitswap") + + return &metrics{ + WantlistsTotal: wantlistsTotal(ctx), + WantlistsItemsTotal: wantlistsItemsTotal(ctx), + WantlistsSeconds: wantlistsSeconds(ctx), + ResponseSizes: responseSizes(ctx), + } +} diff --git a/bitswap/network/options.go b/bitswap/network/bsnet/options.go similarity index 96% rename from bitswap/network/options.go rename to bitswap/network/bsnet/options.go index 10d02e5e9..cbe355c0f 100644 --- a/bitswap/network/options.go +++ b/bitswap/network/bsnet/options.go @@ -1,4 +1,4 @@ -package network +package bsnet import "github.com/libp2p/go-libp2p/core/protocol" diff --git a/bitswap/network/connecteventmanager.go b/bitswap/network/connecteventmanager.go index bf3766089..440aa0489 100644 --- a/bitswap/network/connecteventmanager.go +++ b/bitswap/network/connecteventmanager.go @@ -4,9 +4,12 @@ import ( "sync" "github.com/gammazero/deque" + logging "github.com/ipfs/go-log/v2" "github.com/libp2p/go-libp2p/core/peer" ) +var log = logging.Logger("bitswap/connevtman") + type ConnectionListener interface { PeerConnected(peer.ID) PeerDisconnected(peer.ID) @@ -20,7 +23,7 @@ const ( stateUnresponsive ) -type connectEventManager struct { +type ConnectEventManager struct { connListeners []ConnectionListener lk sync.RWMutex cond sync.Cond @@ -36,8 +39,8 @@ type peerState struct { pending bool } -func newConnectEventManager(connListeners ...ConnectionListener) *connectEventManager { - evtManager := &connectEventManager{ +func NewConnectEventManager(connListeners ...ConnectionListener) *ConnectEventManager { + evtManager := &ConnectEventManager{ connListeners: connListeners, peers: make(map[peer.ID]*peerState), done: make(chan struct{}), @@ -46,11 +49,11 @@ func newConnectEventManager(connListeners ...ConnectionListener) *connectEventMa return evtManager } -func (c *connectEventManager) Start() { +func (c *ConnectEventManager) Start() { go c.worker() } -func (c *connectEventManager) Stop() { +func (c *ConnectEventManager) Stop() { c.lk.Lock() c.stop = true c.lk.Unlock() @@ -59,7 +62,7 @@ func (c *connectEventManager) Stop() { <-c.done } -func (c *connectEventManager) getState(p peer.ID) state { +func (c *ConnectEventManager) getState(p peer.ID) state { if state, ok := c.peers[p]; ok { return state.newState } else { @@ -67,7 +70,7 @@ func (c *connectEventManager) getState(p peer.ID) state { } } -func (c *connectEventManager) setState(p peer.ID, newState state) { +func (c *ConnectEventManager) setState(p peer.ID, newState state) { state, ok := c.peers[p] if !ok { state = new(peerState) @@ -83,14 +86,14 @@ func (c *connectEventManager) setState(p peer.ID, newState state) { // Waits for a change to be enqueued, or for the event manager to be stopped. Returns false if the // connect event manager has been stopped. -func (c *connectEventManager) waitChange() bool { +func (c *ConnectEventManager) waitChange() bool { for !c.stop && c.changeQueue.Len() == 0 { c.cond.Wait() } return !c.stop } -func (c *connectEventManager) worker() { +func (c *ConnectEventManager) worker() { c.lk.Lock() defer c.lk.Unlock() defer close(c.done) @@ -145,7 +148,7 @@ func (c *connectEventManager) worker() { } // Called whenever we receive a new connection. May be called many times. -func (c *connectEventManager) Connected(p peer.ID) { +func (c *ConnectEventManager) Connected(p peer.ID) { c.lk.Lock() defer c.lk.Unlock() @@ -158,7 +161,7 @@ func (c *connectEventManager) Connected(p peer.ID) { } // Called when we drop the final connection to a peer. -func (c *connectEventManager) Disconnected(p peer.ID) { +func (c *ConnectEventManager) Disconnected(p peer.ID) { c.lk.Lock() defer c.lk.Unlock() @@ -172,7 +175,7 @@ func (c *connectEventManager) Disconnected(p peer.ID) { } // Called whenever a peer is unresponsive. -func (c *connectEventManager) MarkUnresponsive(p peer.ID) { +func (c *ConnectEventManager) MarkUnresponsive(p peer.ID) { c.lk.Lock() defer c.lk.Unlock() @@ -191,7 +194,7 @@ func (c *connectEventManager) MarkUnresponsive(p peer.ID) { // - When not connected, we ignore this call. Unfortunately, a peer may disconnect before we process // // the "on message" event, so we can't treat this as evidence of a connection. -func (c *connectEventManager) OnMessage(p peer.ID) { +func (c *ConnectEventManager) OnMessage(p peer.ID) { c.lk.RLock() unresponsive := c.getState(p) == stateUnresponsive c.lk.RUnlock() diff --git a/bitswap/network/connecteventmanager_test.go b/bitswap/network/connecteventmanager_test.go index 5d57fc104..90feee65e 100644 --- a/bitswap/network/connecteventmanager_test.go +++ b/bitswap/network/connecteventmanager_test.go @@ -36,7 +36,7 @@ func (cl *mockConnListener) PeerDisconnected(p peer.ID) { cl.events = append(cl.events, mockConnEvent{connected: false, peer: p}) } -func wait(t *testing.T, c *connectEventManager) { +func wait(t *testing.T, c *ConnectEventManager) { require.Eventually(t, func() bool { c.lk.RLock() defer c.lk.RUnlock() @@ -47,7 +47,7 @@ func wait(t *testing.T, c *connectEventManager) { func TestConnectEventManagerConnectDisconnect(t *testing.T) { connListener := newMockConnListener() peers := random.Peers(2) - cem := newConnectEventManager(connListener) + cem := NewConnectEventManager(connListener) cem.Start() t.Cleanup(cem.Stop) @@ -86,7 +86,7 @@ func TestConnectEventManagerConnectDisconnect(t *testing.T) { func TestConnectEventManagerMarkUnresponsive(t *testing.T) { connListener := newMockConnListener() p := random.Peers(1)[0] - cem := newConnectEventManager(connListener) + cem := NewConnectEventManager(connListener) cem.Start() t.Cleanup(cem.Stop) @@ -135,7 +135,7 @@ func TestConnectEventManagerMarkUnresponsive(t *testing.T) { func TestConnectEventManagerDisconnectAfterMarkUnresponsive(t *testing.T) { connListener := newMockConnListener() p := random.Peers(1)[0] - cem := newConnectEventManager(connListener) + cem := NewConnectEventManager(connListener) cem.Start() t.Cleanup(cem.Stop) diff --git a/bitswap/network/http_multiaddr.go b/bitswap/network/http_multiaddr.go new file mode 100644 index 000000000..de1da36e5 --- /dev/null +++ b/bitswap/network/http_multiaddr.go @@ -0,0 +1,129 @@ +package network + +import ( + "fmt" + "net" + "net/url" + + "github.com/libp2p/go-libp2p/core/peer" + "github.com/multiformats/go-multiaddr" +) + +// ParsedURL contains the result of parsing an "http" transport multiaddress. +// SNI is set when the multiaddress specifies an SNI value. +type ParsedURL struct { + URL *url.URL + SNI string +} + +// ExtractHTTPAddress extracts the HTTP schema+host+port from a multiaddress +// and returns a *url.URL and an SNI string if present. +func ExtractHTTPAddress(ma multiaddr.Multiaddr) (ParsedURL, error) { + components := ma.Protocols() + var host, port, schema, sni string + var tls bool + + for _, comp := range components { + switch comp.Name { + case "dns", "dns4", "dns6", "ip4", "ip6": + hostVal, err := ma.ValueForProtocol(comp.Code) + if err != nil { + return ParsedURL{}, fmt.Errorf("failed to extract host: %w", err) + } + host = hostVal + case "tcp", "udp": + portVal, err := ma.ValueForProtocol(comp.Code) + if err != nil { + return ParsedURL{}, fmt.Errorf("failed to extract port: %w", err) + } + port = portVal + case "tls": + tls = true + case "http": + schema = "http" + if tls { + schema = "https" + } + case "https": + schema = "https" + case "sni": + schema = "https" + sniVal, err := ma.ValueForProtocol(comp.Code) + if err != nil { + return ParsedURL{}, fmt.Errorf("failed to extract SNI: %w", err) + } + sni = sniVal + } + } + + if host == "" || port == "" || schema == "" { + return ParsedURL{}, fmt.Errorf("multiaddress is missing required components (host/port/schema)") + } + + // Construct the URL object + address := fmt.Sprintf("%s://%s:%s", schema, host, port) + pURL, err := url.Parse(address) + if err != nil { + return ParsedURL{}, fmt.Errorf("failed to parse URL: %w", err) + } + + parsedURL := ParsedURL{ + URL: pURL, + SNI: sni, + } + + // Error on addresses which are not https nor local + ip := net.ParseIP(host) + if ip != nil { + if schema != "https" && !(ip.IsLoopback() || ip.IsPrivate()) { + return parsedURL, fmt.Errorf("multiaddress is not a TLS endpoint nor a local or private IP address") + } + } else if schema != "https" { + return parsedURL, fmt.Errorf("multiaddress is not a TLS endpoint nor a local or private IP address") + } + + return parsedURL, nil +} + +// ExtractURLsFromPeer extracts all HTTP schema+host+port addresses as ParsedURL from a peer.AddrInfo object. +func ExtractURLsFromPeer(info peer.AddrInfo) []ParsedURL { + var addresses []ParsedURL + + for _, addr := range info.Addrs { + purl, err := ExtractHTTPAddress(addr) + if err != nil { + // Skip invalid or non-HTTP addresses but continue with others + continue + } + addresses = append(addresses, purl) + } + + return addresses +} + +// SplitHTTPAddrs splits a peer.AddrInfo into two: one containing HTTP/HTTPS addresses, and the other containing the rest. +func SplitHTTPAddrs(pi peer.AddrInfo) (httpPeer peer.AddrInfo, otherPeer peer.AddrInfo) { + httpPeer.ID = pi.ID + otherPeer.ID = pi.ID + + for _, addr := range pi.Addrs { + if isHTTPAddress(addr) { + httpPeer.Addrs = append(httpPeer.Addrs, addr) + } else { + otherPeer.Addrs = append(otherPeer.Addrs, addr) + } + } + + return +} + +// isHTTPAddress checks if a multiaddress is an HTTP or HTTPS address. +func isHTTPAddress(ma multiaddr.Multiaddr) bool { + protocols := ma.Protocols() + for _, proto := range protocols { + if proto.Name == "http" || proto.Name == "https" { + return true + } + } + return false +} diff --git a/bitswap/network/http_multiaddr_test.go b/bitswap/network/http_multiaddr_test.go new file mode 100644 index 000000000..5807e6389 --- /dev/null +++ b/bitswap/network/http_multiaddr_test.go @@ -0,0 +1,211 @@ +package network + +import ( + "net/url" + "testing" + + "github.com/libp2p/go-libp2p/core/peer" + "github.com/multiformats/go-multiaddr" +) + +func TestExtractHTTPAddress(t *testing.T) { + tests := []struct { + name string + maStr string + want *url.URL + sni string + expectErr bool + }{ + { + name: "Valid HTTP multiaddress with DNS", + maStr: "/dns4/example.com/tcp/8080/http", + want: &url.URL{ + Scheme: "http", + Host: "example.com:8080", + }, + expectErr: true, // error due to non-local address and no TLS. + }, + { + name: "Valid HTTPS multiaddress with DNS4", + maStr: "/dns4/example.com/tcp/443/https", + want: &url.URL{ + Scheme: "https", + Host: "example.com:443", + }, + expectErr: false, + }, + { + name: "Valid HTTPS multiaddress with DNS6", + maStr: "/dns6/example.com/tcp/443/https", + want: &url.URL{ + Scheme: "https", + Host: "example.com:443", + }, + expectErr: false, + }, + { + name: "Valid HTTPS multiaddress with DNS", + maStr: "/dns/example.com/tcp/443/https", + want: &url.URL{ + Scheme: "https", + Host: "example.com:443", + }, + expectErr: false, + }, + { + + name: "Valid HTTPS multiaddress with DNS", + maStr: "/dns4/example.com/tcp/443/https", + want: &url.URL{ + Scheme: "https", + Host: "example.com:443", + }, + expectErr: false, + }, + { + name: "Valid WSS multiaddress with DNS", + maStr: "/dns4/example.com/tcp/443/wss", + want: nil, + expectErr: true, // error due to wss: we need HTTPs + }, + { + name: "Valid HTTP multiaddress with IP4", + maStr: "/ip4/127.0.0.1/tcp/8080/http", + want: &url.URL{ + Scheme: "http", + Host: "127.0.0.1:8080", + }, + expectErr: false, + }, + { + name: "Missing port", + maStr: "/dns4/example.com/http", + want: nil, + expectErr: true, + }, + { + name: "Invalid multiaddress", + maStr: "/dns4/example.com/tcp/abc/http", + want: nil, + expectErr: true, + }, + { + name: "Unsupported protocol", + maStr: "/unix/tmp/socket", + want: nil, + expectErr: true, + }, + { + name: "Valid HTTP multiaddress with IP6", + maStr: "/ip6/::1/tcp/8080/http", + want: &url.URL{ + Scheme: "http", + Host: "::1:8080", + }, + expectErr: false, + }, + { + name: "tls/http multiaddress without sni", + maStr: "/ip4/127.0.0.1/tcp/8080/tls/http", + want: &url.URL{ + Scheme: "https", + Host: "127.0.0.1:8080", + }, + expectErr: false, + }, + { + name: "tls/http with sni", + maStr: "/dns4/example.com/tcp/443/tls/sni/example2.com/http", + want: &url.URL{ + Scheme: "https", + Host: "example.com:443", + }, + sni: "example2.com", + expectErr: false, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + ma, err := multiaddr.NewMultiaddr(tt.maStr) + if err != nil { + if !tt.expectErr { + t.Fatalf("failed to create multiaddress: %v", err) + } + return + } + + got, err := ExtractHTTPAddress(ma) + if (err != nil) != tt.expectErr { + t.Errorf("got: %s", got.URL) + t.Errorf("ExtractHTTPAddress() error = %v, wantErr %v", err, tt.expectErr) + return + } + + if tt.want != nil && (got.URL == nil || got.URL.String() != tt.want.String() || tt.sni != got.SNI) { + t.Errorf("ExtractHTTPAddress() = %v (%s), want %v (%s)", got.URL, got.SNI, tt.want, tt.sni) + } + }) + } +} + +func TestExtractHTTPAddressesFromPeer(t *testing.T) { + tests := []struct { + name string + peerInfo *peer.AddrInfo + want []*url.URL + }{ + { + name: "Valid peer with multiple addresses", + peerInfo: &peer.AddrInfo{ + ID: "12D3KooWQrKv5jtT5anTrKjwgb5dkt7DYHhTT9JzLs7dABZ1mkTf", + Addrs: []multiaddr.Multiaddr{ + multiaddr.StringCast("/dns4/example.com/tcp/8080/http"), + multiaddr.StringCast("/ip4/127.0.0.1/tcp/8081/http"), + multiaddr.StringCast("/ip4/127.0.0.1/tcp/9000"), // Non-HTTP + }, + }, + want: []*url.URL{ + { + Scheme: "http", + Host: "127.0.0.1:8081", + }, + }, + }, + { + name: "No valid HTTP addresses in peer", + peerInfo: &peer.AddrInfo{ + ID: "12D3KooWQrKv5jtT5anTrKjwgb5dkt7DYHhTT9JzLs7dABZ1mkTf", + Addrs: []multiaddr.Multiaddr{ + multiaddr.StringCast("/ip4/127.0.0.1/tcp/9000"), // Non-HTTP + }, + }, + want: nil, + }, + { + name: "Empty peer info", + peerInfo: &peer.AddrInfo{ + ID: "12D3KooWQrKv5jtT5anTrKjwgb5dkt7DYHhTT9JzLs7dABZ1mkTf", + Addrs: []multiaddr.Multiaddr{}, + }, + want: nil, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got := ExtractURLsFromPeer(*tt.peerInfo) + if len(got) != len(tt.want) { + t.Errorf("ExtractHTTPAddressesFromPeer() = %v, want %v", got, tt.want) + return + } + + // Compare URLs + for i := range got { + if got[i].URL.String() != tt.want[i].String() { + t.Errorf("ExtractHTTPAddressesFromPeer() URL at index %d = %v, want %v", i, got[i], tt.want[i]) + } + } + }) + } +} diff --git a/bitswap/network/httpnet/cooldown.go b/bitswap/network/httpnet/cooldown.go new file mode 100644 index 000000000..37d66990f --- /dev/null +++ b/bitswap/network/httpnet/cooldown.go @@ -0,0 +1,99 @@ +package httpnet + +import ( + "sync" + "time" + + "github.com/ipfs/boxo/bitswap/network" +) + +type cooldownTracker struct { + maxBackoff time.Duration + + urlsLock sync.RWMutex + urls map[string]time.Time + + stop chan struct{} +} + +func newCooldownTracker(maxBackoff time.Duration) *cooldownTracker { + ct := &cooldownTracker{ + maxBackoff: maxBackoff, + urls: make(map[string]time.Time), + stop: make(chan struct{}), + } + + go ct.cleaner() + return ct +} + +// every minute clean expired cooldowns. +func (ct *cooldownTracker) cleaner() { + tick := time.NewTicker(time.Minute) + for { + select { + case <-ct.stop: + return + case now := <-tick.C: + ct.urlsLock.Lock() + for host, dl := range ct.urls { + if dl.Before(now) { + delete(ct.urls, host) + } + } + ct.urlsLock.Unlock() + } + } +} + +func (ct *cooldownTracker) stopCleaner() { + close(ct.stop) +} + +func (ct *cooldownTracker) setByDate(host string, t time.Time) { + latestDate := time.Now().Add(ct.maxBackoff) + if t.After(latestDate) { + t = latestDate + } + ct.urlsLock.Lock() + ct.urls[host] = t + ct.urlsLock.Unlock() +} + +func (ct *cooldownTracker) setByDuration(host string, d time.Duration) { + if d > ct.maxBackoff { + d = ct.maxBackoff + } + ct.urlsLock.Lock() + ct.urls[host] = time.Now().Add(d) + ct.urlsLock.Unlock() +} + +func (ct *cooldownTracker) remove(host string) { + ct.urlsLock.Lock() + delete(ct.urls, host) + ct.urlsLock.Unlock() +} + +func (ct *cooldownTracker) fillSenderURLs(urls []network.ParsedURL) []*senderURL { + now := time.Now() + surls := make([]*senderURL, len(urls)) + ct.urlsLock.RLock() + { + + for i, u := range urls { + var cooldown time.Time + dl, ok := ct.urls[u.URL.Host] + if ok && now.Before(dl) { + cooldown = dl + } + surls[i] = &senderURL{ + ParsedURL: u, + } + surls[i].cooldown.Store(cooldown) + + } + } + ct.urlsLock.RUnlock() + return surls +} diff --git a/bitswap/network/httpnet/httpnet.go b/bitswap/network/httpnet/httpnet.go new file mode 100644 index 000000000..0a201523e --- /dev/null +++ b/bitswap/network/httpnet/httpnet.go @@ -0,0 +1,719 @@ +// Package httpnet implements an Exchange network that sends and receives +// Exchange messages from peers' HTTP endpoints. +package httpnet + +import ( + "context" + "crypto/tls" + "errors" + "fmt" + "net" + "net/http" + "net/url" + "reflect" + "runtime/debug" + "strings" + "sync" + "sync/atomic" + "time" + + bsmsg "github.com/ipfs/boxo/bitswap/message" + "github.com/ipfs/boxo/bitswap/network" + blocks "github.com/ipfs/go-block-format" + logging "github.com/ipfs/go-log/v2" + "github.com/libp2p/go-libp2p/core/host" + "github.com/libp2p/go-libp2p/core/peer" + "github.com/libp2p/go-libp2p/core/peerstore" + "github.com/libp2p/go-libp2p/p2p/protocol/ping" + "github.com/multiformats/go-multiaddr" +) + +var log = logging.Logger("httpnet") + +var ErrNoHTTPAddresses = errors.New("AddrInfo does not contain any valid HTTP addresses") +var ErrNoSuccess = errors.New("none of the peer HTTP endpoints responded successfully to request") + +var _ network.BitSwapNetwork = (*Network)(nil) + +var ( + // DefaultUserAgent is sent as a header in all requests. + DefaultUserAgent = defaultUserAgent() // Usually will result in a "boxo@commitID" +) + +// Defaults for the configurable options. +const ( + DefaultMaxBlockSize int64 = 2 << 20 // 2MiB: https://specs.ipfs.tech/bitswap-protocol/#block-sizes + DefaultDialTimeout = 5 * time.Second + DefaultIdleConnTimeout = 30 * time.Second + DefaultResponseHeaderTimeout = 10 * time.Second + DefaultMaxIdleConns = 50 + DefaultInsecureSkipVerify = false + DefaultMaxBackoff = time.Minute + DefaultMaxHTTPAddressesPerPeer = 10 + DefaultHTTPWorkers = 64 +) + +var pingCid = "bafkqaaa" // identity CID + +const http2proto = "HTTP/2.0" + +const peerstoreSupportsHeadKey = "http-retrieval-head-support" + +// Option allows to configure the Network. +type Option func(net *Network) + +// WithUserAgent sets the user agent when making requests. +func WithUserAgent(agent string) Option { + return func(net *Network) { + net.userAgent = agent + } +} + +// WithMaxBlockSize sets the maximum size of an HTTP response (block). +func WithMaxBlockSize(size int64) Option { + return func(net *Network) { + net.maxBlockSize = size + } +} + +// WithDialTimeout sets the maximum time to wait for a connection to be set up. +func WithDialTimeout(t time.Duration) Option { + return func(net *Network) { + net.dialTimeout = t + } +} + +// WithIdleConnTimeout sets how long to keep connections alive before closing +// them when no requests happen. +func WithIdleConnTimeout(t time.Duration) Option { + return func(net *Network) { + net.idleConnTimeout = t + } +} + +// WithResponseHeaderTimeout sets how long to wait for a response to start +// arriving. It is the time given to the provider to find and start sending +// the block. It does not affect the time it takes to download the request body. +func WithResponseHeaderTimeout(t time.Duration) Option { + return func(net *Network) { + net.responseHeaderTimeout = t + } +} + +// WithMaxIdleConns sets how many keep-alive connections we can have where no +// requests are happening. +func WithMaxIdleConns(n int) Option { + return func(net *Network) { + net.maxIdleConns = n + } +} + +// WithInsecureSkipVerify allows making HTTPS connections to test servers. +// Use for testing. +func WithInsecureSkipVerify(b bool) Option { + return func(net *Network) { + net.insecureSkipVerify = b + } +} + +// WithAllowlist sets the hostnames that we are allowed to connect to via +// HTTP. Additionally, http response status metrics are tagged for each of +// these hosts. +func WithAllowlist(hosts []string) Option { + return func(net *Network) { + net.allowlist = make(map[string]struct{}) + for _, h := range hosts { + net.allowlist[h] = struct{}{} + } + } +} + +// WithMaxHTTPAddressesPerPeer limits how many http addresses we attempt to +// connect to per peer. +func WithMaxHTTPAddressesPerPeer(max int) Option { + return func(net *Network) { + net.maxHTTPAddressesPerPeer = max + } +} + +// WithHTTPWorkers controls how many HTTP requests can be done concurrently. +func WithHTTPWorkers(n int) Option { + return func(net *Network) { + net.httpWorkers = n + } +} + +type Network struct { + // NOTE: Stats must be at the top of the heap allocation to ensure 64bit + // alignment. + stats network.Stats + + host host.Host + client *http.Client + + closeOnce sync.Once + closing chan struct{} + receivers []network.Receiver + connEvtMgr *network.ConnectEventManager + pinger *pinger + requestTracker *requestTracker + cooldownTracker *cooldownTracker + + // options + userAgent string + maxBlockSize int64 + dialTimeout time.Duration + idleConnTimeout time.Duration + responseHeaderTimeout time.Duration + maxIdleConns int + insecureSkipVerify bool + maxHTTPAddressesPerPeer int + httpWorkers int + allowlist map[string]struct{} + + metrics *metrics + httpRequests chan httpRequestInfo +} + +type httpRequestInfo struct { + ctx context.Context + sender *httpMsgSender + entry bsmsg.Entry + result chan<- httpResult + startTime time.Time +} + +type httpResult struct { + info httpRequestInfo + block blocks.Block + err *senderError +} + +// New returns a BitSwapNetwork supported by underlying IPFS host. +func New(host host.Host, opts ...Option) network.BitSwapNetwork { + htnet := &Network{ + host: host, + closing: make(chan struct{}), + userAgent: defaultUserAgent(), + maxBlockSize: DefaultMaxBlockSize, + dialTimeout: DefaultDialTimeout, + idleConnTimeout: DefaultIdleConnTimeout, + responseHeaderTimeout: DefaultResponseHeaderTimeout, + maxIdleConns: DefaultMaxIdleConns, + insecureSkipVerify: DefaultInsecureSkipVerify, + maxHTTPAddressesPerPeer: DefaultMaxHTTPAddressesPerPeer, + httpWorkers: DefaultHTTPWorkers, + httpRequests: make(chan httpRequestInfo), + } + + for _, opt := range opts { + opt(htnet) + } + + htnet.metrics = newMetrics(htnet.allowlist) + + reqTracker := newRequestTracker() + htnet.requestTracker = reqTracker + + cooldownTracker := newCooldownTracker(DefaultMaxBackoff) + htnet.cooldownTracker = cooldownTracker + + netdialer := &net.Dialer{ + // Timeout for connects to complete. + Timeout: htnet.dialTimeout, + KeepAlive: 15 * time.Second, + // TODO for go1.23 + // // KeepAlive config for sending probes for an active + // // connection. + // KeepAliveConfig: net.KeepAliveConfig{ + // Enable: true, + // Idle: 15 * time.Second, // default + // Interval: 15 * time.Second, // default + // Count: 2, // default would be 9 + // }, + } + + // Re: wasm: see + // https://cs.opensource.google/go/go/+/266626211e40d1f2c3a34fa4cd2023f5310cbd7d + // In wasm builds custom Dialer gets ignored. DefaultTransport makes + // sure it sets DialContext to nil for wasm builds as to not break the + // "contract". Probably makes no difference in the end, but we do the + // same, just in case. + dialCtx := netdialer.DialContext + if http.DefaultTransport.(*http.Transport).DialContext == nil { + dialCtx = nil + } + + tlsCfg := &tls.Config{ + InsecureSkipVerify: htnet.insecureSkipVerify, + } + + t := &http.Transport{ + TLSClientConfig: tlsCfg, + Proxy: http.ProxyFromEnvironment, + DialContext: dialCtx, + ForceAttemptHTTP2: true, + // MaxIdleConns: how many keep-alive conns can we have without + // requests. + MaxIdleConns: htnet.maxIdleConns, + // IdleConnTimeout: how long can a keep-alive connection stay + // around without requests. + IdleConnTimeout: htnet.idleConnTimeout, + ResponseHeaderTimeout: htnet.responseHeaderTimeout, + ExpectContinueTimeout: 1 * time.Second, + MaxResponseHeaderBytes: 2 << 10, // 2KiB + ReadBufferSize: 16 << 10, // 16KiB. Default is 4KiB. 16KiB is max TLS buffer size. + } + + c := &http.Client{ + Transport: t, + } + htnet.client = c + + pinger := newPinger(htnet, pingCid) + htnet.pinger = pinger + + for i := 0; i < htnet.httpWorkers; i++ { + go htnet.httpWorker(i) + } + + return htnet +} + +// Start sets up the given receivers to be notified when message responses are +// received. It also starts the connection event manager. Start must be called +// before using the Network. +func (ht *Network) Start(receivers ...network.Receiver) { + allowlist := make([]string, 0, len(ht.allowlist)) + for k := range ht.allowlist { + allowlist = append(allowlist, k) + } + log.Infof("httpnet: HTTP retrieval system started with allowlist: %s", strings.Join(allowlist, ",")) + ht.receivers = receivers + connectionListeners := make([]network.ConnectionListener, len(receivers)) + for i, v := range receivers { + connectionListeners[i] = v + } + ht.connEvtMgr = network.NewConnectEventManager(connectionListeners...) + + ht.connEvtMgr.Start() +} + +// Stop stops the connect event manager associated with this network. +// Other methods should no longer be used after calling Stop(). +func (ht *Network) Stop() { + ht.connEvtMgr.Stop() + ht.cooldownTracker.stopCleaner() + ht.closeOnce.Do(func() { + close(ht.closing) + }) +} + +// Ping triggers a ping to the given peer and returns the latency. +func (ht *Network) Ping(ctx context.Context, p peer.ID) ping.Result { + return ht.pinger.ping(ctx, p) + +} + +// Latency returns the EWMA latency for the given peer. +func (ht *Network) Latency(p peer.ID) time.Duration { + return ht.pinger.latency(p) +} + +func (ht *Network) senderURLs(p peer.ID) []*senderURL { + pi := ht.host.Peerstore().PeerInfo(p) + urls := network.ExtractURLsFromPeer(pi) + if len(urls) == 0 { + return nil + } + return ht.cooldownTracker.fillSenderURLs(urls) +} + +// SendMessage sends the given message to the given peer. It uses +// NewMessageSender under the hood, with default options. +func (ht *Network) SendMessage(ctx context.Context, p peer.ID, msg bsmsg.BitSwapMessage) error { + if len(msg.Wantlist()) == 0 { + return nil + } + + log.Debugf("SendMessage: %s", p) + + // Note: SendMessage seems to only be used to send cancellations. + // So default options are fine. + sender, err := ht.NewMessageSender(ctx, p, nil) + if err != nil { + return err + } + return sender.SendMsg(ctx, msg) +} + +// Self returns the local peer ID. +func (ht *Network) Self() peer.ID { + return ht.host.ID() +} + +// Connect attempts setting up an HTTP connection to the given peer. The given +// AddrInfo must include at least one HTTP endpoint for the peer. HTTP URLs in +// AddrInfo will be tried by making an HTTP GET request to +// "ipfs/bafyqaaa", which is the CID for an empty raw block (inlined). +// Any completed request, regardless of the HTTP response, is considered a +// connection success and marks this peer as "connected", setting it up to +// handle messages and make requests. The peer will be pinged regularly to +// collect latency measurements until DisconnectFrom() is called. +func (ht *Network) Connect(ctx context.Context, p peer.AddrInfo) error { + existingPeerAddrs := ht.host.Peerstore().Addrs(p.ID) + existingPeerURLs := network.ExtractURLsFromPeer(peer.AddrInfo{ + ID: p.ID, + Addrs: existingPeerAddrs, + }) + if len(existingPeerURLs) > 0 { + // assume already connected FIXME: issues when different + // provider records contain different HTTP urls for the same + // peer. Hope no one does that. + return nil + } + + htaddrs, _ := network.SplitHTTPAddrs(p) + if len(htaddrs.Addrs) == 0 { + return ErrNoHTTPAddresses + } + + // avoid funny things like someone adding 100 broken urls to a peer. + if len(htaddrs.Addrs) > ht.maxHTTPAddressesPerPeer { + htaddrs.Addrs = htaddrs.Addrs[0:ht.maxHTTPAddressesPerPeer] + } + + urls := network.ExtractURLsFromPeer(htaddrs) + if len(ht.allowlist) > 0 { + var filteredURLs []network.ParsedURL + var filteredAddrs []multiaddr.Multiaddr + for i, u := range urls { + host, _, err := net.SplitHostPort(u.URL.Host) + if err != nil { + return err + } + if _, ok := ht.allowlist[host]; ok { + filteredURLs = append(filteredURLs, u) + filteredAddrs = append(filteredAddrs, htaddrs.Addrs[i]) + } + } + urls = filteredURLs + htaddrs.Addrs = filteredAddrs + } + + // if len(filteredURLs == 0) nothing will happen below and we will return + // an error below. + + // We will know try to talk to this peer by making HTTP requests to its urls + // and recording which ones work. + // This allows re-using the connections that we are about to open next + // time with the client. We call peer.Connected() + // on success. + var workingAddrs []multiaddr.Multiaddr + supportsHead := true + for i, u := range urls { + err := ht.connect(ctx, p.ID, u, "GET") + if err != nil { + // abort if context cancelled + if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) { + return err + } + continue + } + // GET works. Does HEAD work though? + err = ht.connect(ctx, p.ID, u, "HEAD") + if err != nil { + // abort if context cancelled + if ctxErr := ctx.Err(); ctxErr != nil { + return ctxErr + } + supportsHead = false + } + + workingAddrs = append(workingAddrs, htaddrs.Addrs[i]) + } + + if len(workingAddrs) > 0 { + ht.host.Peerstore().AddAddrs(p.ID, workingAddrs, peerstore.PermanentAddrTTL) + // ignoring error + _ = ht.host.Peerstore().Put(p.ID, peerstoreSupportsHeadKey, supportsHead) + + ht.connEvtMgr.Connected(p.ID) + ht.pinger.startPinging(p.ID) + log.Debugf("connect success to %s (supports HEAD: %t)", p.ID, supportsHead) + // We "connected" + return nil + } + + err := fmt.Errorf("connect failure to %s: %w", p.ID, ErrNoSuccess) + log.Debug(err) + return err +} + +func (ht *Network) connect(ctx context.Context, p peer.ID, u network.ParsedURL, method string) error { + req, err := buildRequest(ctx, u, method, pingCid, ht.userAgent) + if err != nil { + log.Debug(err) + return err + } + + log.Debugf("connect request to %s %s %q", p, method, req.URL) + resp, err := ht.client.Do(req) + if err != nil { + return err + } + + // For HTTP, the address can only be a LAN IP as otherwise it would have + // been filtered out before. + // So IF it is HTTPS and not http2, we abort because we don't want + // requests to non-local hosts without http2. + if u.URL.Scheme == "https" && resp.Proto != http2proto { + err = fmt.Errorf("%s://%q is not using HTTP/2 (%s)", req.URL.Scheme, req.URL.Host, resp.Proto) + log.Warn(err) + return err + } + + // probe success. + // FIXME: Storacha returns 410 for our probe. + if resp.StatusCode == 200 || resp.StatusCode == 204 || resp.StatusCode == 410 { + return nil + } + + log.Debugf("connect error: %d <- %q (%s)", resp.StatusCode, req.URL, p) + // We made a proper request and got a 5xx back. + // We cannot consider this a working connection. + return errors.New("response status code is not 200") +} + +// DisconnectFrom marks this peer as Disconnected in the connection event +// manager, stops pinging for latency measurements and removes it from the +// peerstore. +func (ht *Network) DisconnectFrom(ctx context.Context, p peer.ID) error { + pi := ht.host.Peerstore().PeerInfo(p) + _, bsaddrs := network.SplitHTTPAddrs(pi) + ht.host.Peerstore().ClearAddrs(p) + if len(bsaddrs.Addrs) == 0 { + // this should always be the case unless we have been + // contacted via bitswap... + ht.connEvtMgr.Disconnected(p) + } else { // re-add bitswap addresses + // unfortunately we cannot maintain ttl info + ht.host.Peerstore().SetAddrs(p, bsaddrs.Addrs, peerstore.TempAddrTTL) + } + ht.pinger.stopPinging(p) + + // coolDownTracker: we leave untouched. We want to keep + // ongoing cooldowns there in case we reconnect to this peer. + + return nil +} + +// TagPeer uses the host's ConnManager to tag a peer. +func (ht *Network) TagPeer(p peer.ID, tag string, w int) { + ht.host.ConnManager().TagPeer(p, tag, w) +} + +// UntagPeer uses the host's ConnManager to untag a peer. +func (ht *Network) UntagPeer(p peer.ID, tag string) { + ht.host.ConnManager().UntagPeer(p, tag) +} + +// Protect does nothing. The purpose of Protect is to mantain connections as +// long as they are used. But our connections are already maintained as long +// as they are, and closed when not. +func (ht *Network) Protect(p peer.ID, tag string) { +} + +// Unprotect does nothing. The purpose of Unprotect is to be able to close +// connections when they are no longer relevant. Our connections are already +// closed when they are not used. It returns always true as technically our +// connections are potentially still protected as long as they are used. +func (ht *Network) Unprotect(p peer.ID, tag string) bool { + return true +} + +// Stats returns message counts for this peer. Each message sent is an HTTP +// requests. Each message received is an HTTP response. +func (ht *Network) Stats() network.Stats { + return network.Stats{ + MessagesRecvd: atomic.LoadUint64(&ht.stats.MessagesRecvd), + MessagesSent: atomic.LoadUint64(&ht.stats.MessagesSent), + } +} + +func (ht *Network) httpWorker(i int) { + for { + select { + case <-ht.closing: + return + case reqInfo := <-ht.httpRequests: + retryLaterErrors := 0 + var urlIgnore []*senderURL + for { + // bestURL + u, err := reqInfo.sender.bestURL(urlIgnore) + if err != nil { + reqInfo.result <- httpResult{ + info: reqInfo, + err: &senderError{ + Type: typeFatal, + Err: err, + }, + } + break // stop retry loop + } + + // no urls to retry left. + if u == nil { + reqInfo.result <- httpResult{ + info: reqInfo, + err: &senderError{ + Type: typeClient, + Err: nil, + }, + } + break // stop retry loop + } + + b, serr := reqInfo.sender.tryURL( + reqInfo.ctx, + u, + reqInfo.entry, + ) + + result := httpResult{ + info: reqInfo, + block: b, + err: serr, + } + + if serr != nil { + switch serr.Type { + case typeRetryLater: + // This error signals that we + // should retry but if things + // keep failing we consider it + // a serverError. When + // multiple urls, retries may + // happen on a different url. + retryLaterErrors++ + if retryLaterErrors%2 == 0 { + // we retried same CID 2 times. No luck. + // Increase server errors. + // Start ignoring urls. + result.err.Type = typeServer + urlIgnore = append(urlIgnore, u) + u.serverErrors.Add(1) + } + continue // retry request again + case typeClient: + urlIgnore = append(urlIgnore, u) + continue // retry again ignoring current url + case typeContext: + case typeFatal: + log.Error(err) + case typeServer: + u.serverErrors.Add(1) + continue // retry until bestURL forces abort + + default: + panic("unknown sender error type") + } + } + + reqInfo.result <- result + break // exit retry loop + } + } + } +} + +// buildRequests sets up common settings for making a requests. +func buildRequest(ctx context.Context, u network.ParsedURL, method string, cid string, userAgent string) (*http.Request, error) { + // copy url + sendURL, _ := url.Parse(u.URL.String()) + sendURL.RawQuery = "format=raw" + sendURL.Path += "/ipfs/" + cid + + req, err := http.NewRequestWithContext(ctx, + method, + sendURL.String(), + nil, + ) + if err != nil { + log.Error(err) + return nil, err + } + + headers := make(http.Header) + headers.Add("Accept", "application/vnd.ipld.raw") + headers.Add("User-Agent", userAgent) + if u.SNI != "" { + headers.Add("Host", u.SNI) + } + req.Header = headers + return req, nil +} + +// NewMessageSender returns a MessageSender implementation which sends the +// given message to the given peer over HTTP. +// An error is returned of the peer has no known HTTP endpoints. +func (ht *Network) NewMessageSender(ctx context.Context, p peer.ID, opts *network.MessageSenderOpts) (network.MessageSender, error) { + // cooldowns made by other senders between now and SendMsg will not be + // taken into account since we access that info here only. From that + // point, we only react to cooldowns/errors received by this message + // sender and not others. This is mostly fine given how MessageSender + // is used as part of MessageQueue: + // + // * We expect peers to be associated with single urls so there will + // not be multiple message sender for the same url normally. + // * We remember cooldowns between message senders (i.e. when a queue + // dies and a new one is created). + // * We track cooldowns in the urls for the lifetime of this sender. + // + // This way we minimize lock contention around the cooldown map, with + // one read access per message sender only. + urls := ht.senderURLs(p) + if len(urls) == 0 { + return nil, ErrNoHTTPAddresses + } + + log.Debugf("NewMessageSender: %s", p) + senderOpts := setSenderOpts(opts) + + return &httpMsgSender{ + // ctx ?? + ht: ht, + peer: p, + urls: urls, + closing: make(chan struct{}, 1), + opts: senderOpts, + }, nil +} + +// defaultUserAgent returns a useful user agent version string allowing us to +// identify requests coming from official releases of this module vs forks. +func defaultUserAgent() (ua string) { + p := reflect.ValueOf(Network{}).Type().PkgPath() + // we have monorepo, so stripping the remainder + importPath := strings.TrimSuffix(p, "/bitswap/network/httpnet") + + ua = importPath + var module *debug.Module + if bi, ok := debug.ReadBuildInfo(); ok { + // If debug.ReadBuildInfo was successful, we can read Version by finding + // this client in the dependency list of the app that has it in go.mod + for _, dep := range bi.Deps { + if dep.Path == importPath { + module = dep + break + } + } + if module != nil { + ua += "@" + module.Version + return + } + ua += "@unknown" + } + return +} diff --git a/bitswap/network/httpnet/httpnet_test.go b/bitswap/network/httpnet/httpnet_test.go new file mode 100644 index 000000000..30b4091a6 --- /dev/null +++ b/bitswap/network/httpnet/httpnet_test.go @@ -0,0 +1,545 @@ +package httpnet + +import ( + "context" + "errors" + "fmt" + "net/http" + "net/http/httptest" + "net/url" + "strings" + "testing" + "time" + + bsmsg "github.com/ipfs/boxo/bitswap/message" + pb "github.com/ipfs/boxo/bitswap/message/pb" + "github.com/ipfs/boxo/bitswap/network" + "github.com/ipfs/boxo/blockstore" + blocks "github.com/ipfs/go-block-format" + "github.com/ipfs/go-cid" + ds "github.com/ipfs/go-datastore" + dssync "github.com/ipfs/go-datastore/sync" + ipld "github.com/ipfs/go-ipld-format" + "github.com/libp2p/go-libp2p/core/host" + "github.com/libp2p/go-libp2p/core/peer" + "github.com/libp2p/go-libp2p/core/peerstore" + mocknet "github.com/libp2p/go-libp2p/p2p/net/mock" + "github.com/multiformats/go-multiaddr" + manet "github.com/multiformats/go-multiaddr/net" +) + +var errorCid = cid.MustParse("bafkreiachshsblgr5kv3mzbgfgmvuhllwe2f6fasm6mykzwsi4l7odq464") // "errorcid" +var slowCid = cid.MustParse("bafkreidhph5i4jevaun4eqjxolqgn3rfpoknj35ocyos3on57iriwpaujm") // "slowcid" +var backoffCid = cid.MustParse("bafkreid6g5qrufgqj46djic7ntjnppaj5bg4urppjoyywrxwegvltrmqbu") // "backoff" + +var _ network.Receiver = (*mockRecv)(nil) + +type mockRecv struct { + blocks map[cid.Cid]struct{} + haves map[cid.Cid]struct{} + donthaves map[cid.Cid]struct{} + waitCh chan struct{} +} + +func (recv *mockRecv) ReceiveMessage(ctx context.Context, sender peer.ID, incoming bsmsg.BitSwapMessage) { + for _, b := range incoming.Blocks() { + recv.blocks[b.Cid()] = struct{}{} + } + + for _, c := range incoming.Haves() { + recv.haves[c] = struct{}{} + } + + for _, c := range incoming.DontHaves() { + recv.donthaves[c] = struct{}{} + } + + recv.waitCh <- struct{}{} +} + +func (recv *mockRecv) wait(seconds time.Duration) error { + ctx, cancel := context.WithTimeout(context.Background(), seconds*time.Second) + defer cancel() + select { + case <-ctx.Done(): + return errors.New("receiver waited too long without receiving message") + case <-recv.waitCh: + return nil + } +} + +func (recv *mockRecv) ReceiveError(err error) { + +} + +func (recv *mockRecv) PeerConnected(p peer.ID) { + +} + +func (recv *mockRecv) PeerDisconnected(p peer.ID) { + +} + +func mockReceiver(t *testing.T) *mockRecv { + t.Helper() + return &mockRecv{ + blocks: make(map[cid.Cid]struct{}), + haves: make(map[cid.Cid]struct{}), + donthaves: make(map[cid.Cid]struct{}), + waitCh: make(chan struct{}, 1), + } + +} + +func mockNet(t *testing.T) mocknet.Mocknet { + t.Helper() + + return mocknet.New() +} + +func mockNetwork(t *testing.T, recv network.Receiver, opts ...Option) (*Network, mocknet.Mocknet) { + t.Helper() + + mn := mockNet(t) + + h, err := mn.GenPeer() + if err != nil { + t.Fatal(err) + } + opts = append(opts, WithInsecureSkipVerify(true)) + htnet := New(h, opts...) + htnet.Start(recv) + return htnet.(*Network), mn +} + +func makeBlocks(t *testing.T, start, end int) []blocks.Block { + t.Helper() + + var blks []blocks.Block + for i := start; i < end; i++ { + blks = append(blks, blocks.NewBlock([]byte(fmt.Sprintf("%d", i)))) + } + return blks +} + +func makeCids(t *testing.T, start, end int) []cid.Cid { + t.Helper() + + var cids []cid.Cid + blks := makeBlocks(t, start, end) + for _, b := range blks { + cids = append(cids, b.Cid()) + } + return cids +} + +func makeMessage(wantlist []cid.Cid, wantType pb.Message_Wantlist_WantType, sendDontHave bool) bsmsg.BitSwapMessage { + msg := bsmsg.New(true) + for _, c := range wantlist { + msg.AddEntry( + c, + 0, + wantType, + sendDontHave, + ) + + } + return msg +} + +func makeWantsMessage(wantlist []cid.Cid) bsmsg.BitSwapMessage { + return makeMessage(wantlist, pb.Message_Wantlist_Block, true) +} + +func makeHavesMessage(wantlist []cid.Cid) bsmsg.BitSwapMessage { + return makeMessage(wantlist, pb.Message_Wantlist_Have, true) +} + +func makeBlockstore(t *testing.T, start, end int) blockstore.Blockstore { + t.Helper() + + bs := blockstore.NewBlockstore(dssync.MutexWrap(ds.NewMapDatastore())) + + blks := makeBlocks(t, start, end) + + ctx := context.Background() + for _, b := range blks { + err := bs.Put(ctx, b) + if err != nil { + t.Fatal(err) + } + } + return bs +} + +type Handler struct { + bstore blockstore.Blockstore +} + +func (h *Handler) ServeHTTP(rw http.ResponseWriter, r *http.Request) { + path := r.URL.Path + _, cidstr, ok := strings.Cut(path, "/ipfs/") + if !ok { + rw.WriteHeader(http.StatusBadRequest) + return + } + + c, err := cid.Parse(cidstr) + if err != nil { + rw.WriteHeader(http.StatusBadRequest) + return + } + + if cidstr == pingCid { + rw.WriteHeader(http.StatusOK) + return + } + + if c.Equals(errorCid) { + rw.WriteHeader(http.StatusInternalServerError) + return + } + + if c.Equals(backoffCid) { + rw.Header().Set("Retry-After", "5") + rw.WriteHeader(http.StatusTooManyRequests) + } + + if c.Equals(slowCid) { + time.Sleep(2 * time.Second) + } + + b, err := h.bstore.Get(r.Context(), c) + if errors.Is(err, ipld.ErrNotFound{}) { + rw.WriteHeader(http.StatusNotFound) + return + } + if err != nil { + rw.WriteHeader(http.StatusInternalServerError) + return + } + + rw.WriteHeader(http.StatusOK) + if r.Method == "HEAD" { + return + } + + rw.Write(b.RawData()) +} + +func makeServer(t *testing.T, bstart, bend int) *httptest.Server { + t.Helper() + + handler := &Handler{ + bstore: makeBlockstore(t, bstart, bend), + } + + srv := httptest.NewUnstartedServer(handler) + srv.EnableHTTP2 = true + srv.StartTLS() + return srv +} + +func srvMultiaddr(t *testing.T, srv *httptest.Server) multiaddr.Multiaddr { + t.Helper() + + maddr, err := manet.FromNetAddr(srv.Listener.Addr()) + if err != nil { + t.Fatal(err) + } + + httpma, err := multiaddr.NewMultiaddr("/https") + if err != nil { + t.Fatal(err) + } + + return maddr.Encapsulate(httpma) +} + +func associateServerToPeer(t *testing.T, srv *httptest.Server, h, remote host.Host) { + h.Peerstore().AddAddr( + remote.ID(), + srvMultiaddr(t, srv), + peerstore.PermanentAddrTTL, + ) +} + +func TestBestURL(t *testing.T) { + ctx := context.Background() + htnet, mn := mockNetwork(t, mockReceiver(t)) + peer, err := mn.GenPeer() + if err != nil { + t.Fatal(err) + } + msrv := makeServer(t, 0, 0) + associateServerToPeer(t, msrv, htnet.host, peer) + + nms, err := htnet.NewMessageSender( + ctx, + peer.ID(), + &network.MessageSenderOpts{ + MaxRetries: 5, + }, + ) + if err != nil { + t.Fatal(err) + } + + ms := nms.(*httpMsgSender) + + baseurl, err := url.Parse("http://127.0.0.1/ipfs") + if err != nil { + t.Fatal(err) + } + var urls []*url.URL + for i := 0; i < 4; i++ { + baseurl.Host = fmt.Sprintf("127.0.0.1:%d", 1000+i) + u, _ := url.Parse(baseurl.String()) + urls = append(urls, u) + } + // add some bogus urls to test the sorting + now := time.Now() + surls := []*senderURL{ + { + ParsedURL: network.ParsedURL{ + URL: urls[0], + }, + }, + { + ParsedURL: network.ParsedURL{ + URL: urls[1], + }, + }, + { + ParsedURL: network.ParsedURL{ + URL: urls[2], + }, + }, + { + ParsedURL: network.ParsedURL{ + URL: urls[3], + }, + }, + } + + surls[0].cooldown.Store(now.Add(time.Second)) + surls[0].serverErrors.Store(6) + surls[1].cooldown.Store(now.Add(time.Second)) + surls[1].serverErrors.Store(1) + surls[2].cooldown.Store(time.Time{}) + surls[2].serverErrors.Store(3) + surls[3].cooldown.Store(time.Time{}) + surls[3].serverErrors.Store(2) + + ms.urls = surls + + sortedUrls := ms.sortURLS() + + expected := []string{ + urls[3].String(), + urls[2].String(), + urls[1].String(), + urls[0].String(), + } + + for i, u := range sortedUrls { + if u.URL.String() != expected[i] { + t.Error("wrong url order", i, u.URL) + } + } + + ms.urls = sortedUrls[3:] + + _, err = ms.bestURL(nil) + if err == nil { + t.Fatal("expected error since only urls failed too many times") + } + +} + +func TestSendMessage(t *testing.T) { + ctx := context.Background() + recv := mockReceiver(t) + htnet, mn := mockNetwork(t, recv) + peer, err := mn.GenPeer() + if err != nil { + t.Fatal(err) + } + msrv := makeServer(t, 0, 10) + associateServerToPeer(t, msrv, htnet.host, peer) + + wl := makeCids(t, 0, 10) + msg := makeWantsMessage(wl) + + err = htnet.SendMessage(ctx, peer.ID(), msg) + if err != nil { + t.Fatal(err) + } + + recv.wait(5) + + for _, c := range wl { + if _, ok := recv.blocks[c]; !ok { + t.Error("block was not received") + } + } +} + +func TestSendMessageWithFailingServer(t *testing.T) { + ctx := context.Background() + recv := mockReceiver(t) + htnet, mn := mockNetwork(t, recv) + peer, err := mn.GenPeer() + if err != nil { + t.Fatal(err) + } + msrv := makeServer(t, 0, 0) + msrv2 := makeServer(t, 0, 10) + associateServerToPeer(t, msrv, htnet.host, peer) + associateServerToPeer(t, msrv2, htnet.host, peer) + + wl := makeCids(t, 0, 10) + msg := makeWantsMessage(wl) + + err = htnet.SendMessage(ctx, peer.ID(), msg) + if err != nil { + t.Fatal(err) + } + + err = recv.wait(5) + if err != nil { + t.Fatal(err) + } + + for _, c := range wl { + if _, ok := recv.blocks[c]; !ok { + t.Error("block was not received") + } + } +} + +func TestSendMessageWithPartialResponse(t *testing.T) { + ctx := context.Background() + recv := mockReceiver(t) + htnet, mn := mockNetwork(t, recv) + peer, err := mn.GenPeer() + if err != nil { + t.Fatal(err) + } + msrv := makeServer(t, 5, 10) + associateServerToPeer(t, msrv, htnet.host, peer) + + wl := makeCids(t, 0, 10) + msg := makeWantsMessage(wl) + + err = htnet.SendMessage(ctx, peer.ID(), msg) + if err != nil { + t.Fatal(err) + } + + recv.wait(5) + + for _, c := range wl[5:10] { + if _, ok := recv.blocks[c]; !ok { + t.Error("block was not received") + } + } + + for _, c := range wl[0:5] { + if _, ok := recv.blocks[c]; ok { + t.Error("block should not have been received") + } + } + +} + +func TestSendMessageSendHavesAndDontHaves(t *testing.T) { + ctx := context.Background() + recv := mockReceiver(t) + htnet, mn := mockNetwork(t, recv) + peer, err := mn.GenPeer() + if err != nil { + t.Fatal(err) + } + msrv := makeServer(t, 0, 5) + associateServerToPeer(t, msrv, htnet.host, peer) + + wl := makeCids(t, 0, 10) + msg := makeHavesMessage(wl) + + err = htnet.SendMessage(ctx, peer.ID(), msg) + if err != nil { + t.Fatal(err) + } + + recv.wait(5) + + for _, c := range wl[0:5] { + if _, ok := recv.haves[c]; !ok { + t.Error("have was not received") + } + } + + for _, c := range wl[5:10] { + if _, ok := recv.donthaves[c]; !ok { + t.Error("dont_have was not received") + } + } +} + +func TestBackOff(t *testing.T) { + ctx := context.Background() + recv := mockReceiver(t) + htnet, mn := mockNetwork(t, recv) + + // 1 server associated to two peers. + // so that it has the same url. + // We trigger backoff using peer1 + // and the backoff should happen when making a + // request on peer2. + + peer, err := mn.GenPeer() + if err != nil { + t.Fatal(err) + } + + peer2, err := mn.GenPeer() + if err != nil { + t.Fatal(err) + } + + msrv := makeServer(t, 0, 1) + associateServerToPeer(t, msrv, htnet.host, peer) + associateServerToPeer(t, msrv, htnet.host, peer2) + + nms, err := htnet.NewMessageSender(ctx, peer.ID(), nil) + if err != nil { + t.Fatal(err) + } + + wl := makeCids(t, 0, 1) + msg := makeWantsMessage([]cid.Cid{backoffCid}) + msg2 := makeWantsMessage(wl) + + err = nms.SendMsg(ctx, msg) + if err != nil { + t.Fatal(err) + } + + nms2, err := htnet.NewMessageSender(ctx, peer2.ID(), nil) + if err != nil { + t.Fatal(err) + } + + err = nms2.SendMsg(ctx, msg2) + if err != nil { + t.Fatal(err) + } + if err != nil { + t.Fatal(err) + } + + if len(recv.blocks) > 0 || len(recv.donthaves) > 0 { + t.Error("no blocks should have been received while on backoff") + } +} diff --git a/bitswap/network/httpnet/metrics.go b/bitswap/network/httpnet/metrics.go new file mode 100644 index 000000000..077aa727d --- /dev/null +++ b/bitswap/network/httpnet/metrics.go @@ -0,0 +1,111 @@ +package httpnet + +import ( + "context" + "strconv" + + imetrics "github.com/ipfs/go-metrics-interface" +) + +var durationHistogramBuckets = []float64{0.05, 0.1, 0.25, 0.5, 1, 2, 5, 10, 30, 60, 120, 240, 480, 960, 1920} + +var blockSizesHistogramBuckets = []float64{1, 128 << 10, 256 << 10, 512 << 10, 1024 << 10, 2048 << 10, 4092 << 10} + +func requestsInFlight(ctx context.Context) imetrics.Gauge { + return imetrics.NewCtx(ctx, "requests_in_flight", "Current number of in-flight requests").Gauge() +} + +func requestsTotal(ctx context.Context) imetrics.Counter { + return imetrics.NewCtx(ctx, "requests_total", "Total request count").Counter() +} + +func requestsFailure(ctx context.Context) imetrics.Counter { + return imetrics.NewCtx(ctx, "requests_failure", "Failed (no response, dial error etc) requests count").Counter() +} + +func requestSentBytes(ctx context.Context) imetrics.Counter { + return imetrics.NewCtx(ctx, "request_sent_bytes", "Total bytes sent on requests").Counter() +} + +func requestTime(ctx context.Context) imetrics.Histogram { + return imetrics.NewCtx(ctx, "request_duration_seconds", "Histogram of request durations").Histogram(durationHistogramBuckets) +} + +func requestsBodyFailure(ctx context.Context) imetrics.Counter { + return imetrics.NewCtx(ctx, "requests_body_failure", "Failure count when reading response body").Counter() +} + +func responseSizes(ctx context.Context) imetrics.Histogram { + return imetrics.NewCtx(ctx, "response_bytes", "Histogram of http response sizes").Histogram(blockSizesHistogramBuckets) +} + +func wantlistsTotal(ctx context.Context) imetrics.Counter { + return imetrics.NewCtx(ctx, "wantlists_total", "Total number of wantlists sent").Counter() +} + +func wantlistsItemsTotal(ctx context.Context) imetrics.Counter { + return imetrics.NewCtx(ctx, "wantlists_items_total", "Total number of elements in sent wantlists").Counter() +} + +func wantlistsSeconds(ctx context.Context) imetrics.Histogram { + return imetrics.NewCtx(ctx, "wantlists_seconds", "Number of seconds spent sending wantlists").Histogram(durationHistogramBuckets) +} + +func status(ctx context.Context) imetrics.CounterVec { + return imetrics.NewCtx(ctx, "status", "Request status count").CounterVec([]string{"method", "status", "host"}) +} + +type metrics struct { + trackedEndpoints map[string]struct{} + + RequestsInFlight imetrics.Gauge + RequestsTotal imetrics.Counter + RequestsFailure imetrics.Counter + RequestsSentBytes imetrics.Counter + WantlistsTotal imetrics.Counter + WantlistsItemsTotal imetrics.Counter + WantlistsSeconds imetrics.Histogram + ResponseSizes imetrics.Histogram + RequestsBodyFailure imetrics.Counter + Status imetrics.CounterVec + RequestTime imetrics.Histogram +} + +func newMetrics(endpoints map[string]struct{}) *metrics { + ctx := imetrics.CtxScope(context.Background(), "exchange_httpnet") + + return &metrics{ + trackedEndpoints: endpoints, + RequestsInFlight: requestsInFlight(ctx), + RequestsTotal: requestsTotal(ctx), + RequestsSentBytes: requestSentBytes(ctx), + RequestsFailure: requestsFailure(ctx), + RequestsBodyFailure: requestsBodyFailure(ctx), + WantlistsTotal: wantlistsTotal(ctx), + WantlistsItemsTotal: wantlistsItemsTotal(ctx), + WantlistsSeconds: wantlistsSeconds(ctx), + ResponseSizes: responseSizes(ctx), + // labels: method, status, host + Status: status(ctx), + RequestTime: requestTime(ctx), + } +} + +func (m *metrics) updateStatusCounter(method string, statusCode int, host string) { + m.RequestsTotal.Inc() + // Track all for the moment. + // if _, ok := m.trackedEndpoints[host]; !ok { + // host = "other" + // } + + var statusStr string + + switch statusCode { + case 200, 400, 403, 404, 410, 429, 451, 500, 502, 504: + statusStr = strconv.Itoa(statusCode) + default: + statusStr = "other" + } + + m.Status.WithLabelValues(method, statusStr, host).Inc() +} diff --git a/bitswap/network/httpnet/msg_sender.go b/bitswap/network/httpnet/msg_sender.go new file mode 100644 index 000000000..1864ae3c8 --- /dev/null +++ b/bitswap/network/httpnet/msg_sender.go @@ -0,0 +1,626 @@ +package httpnet + +import ( + "bytes" + "context" + "errors" + "fmt" + "io" + "net" + "net/http" + "slices" + "strconv" + "strings" + "sync" + "sync/atomic" + "time" + + bsmsg "github.com/ipfs/boxo/bitswap/message" + pb "github.com/ipfs/boxo/bitswap/message/pb" + "github.com/ipfs/boxo/bitswap/network" + blocks "github.com/ipfs/go-block-format" + "github.com/libp2p/go-libp2p/core/peer" + "github.com/libp2p/go-libp2p/core/peerstore" +) + +// MessageSender option defaults. +const ( + // DefaultMaxRetries specifies how many requests to make to available + // HTTP endpoints in case of failure. + DefaultMaxRetries = 1 + // DefaultSendTimeout specifies sending each individual HTTP + // request can take. + DefaultSendTimeout = 5 * time.Second + // SendErrorBackoff specifies how long to wait between retries to the + // same endpoint after failure. It is overriden by Retry-After + // headers and must be at least 50ms. + DefaultSendErrorBackoff = time.Second +) + +func setSenderOpts(opts *network.MessageSenderOpts) network.MessageSenderOpts { + defopts := network.MessageSenderOpts{ + MaxRetries: DefaultMaxRetries, + SendTimeout: DefaultSendTimeout, + SendErrorBackoff: DefaultSendErrorBackoff, + } + + if opts == nil { + return defopts + } + + if mr := opts.MaxRetries; mr > 0 { + defopts.MaxRetries = mr + } + if st := opts.SendTimeout; st > time.Second { + defopts.SendTimeout = st + } + if seb := opts.SendErrorBackoff; seb > 50*time.Millisecond { + defopts.SendErrorBackoff = seb + } + return defopts +} + +// senderURL wraps url with information about cooldowns and errors. +type senderURL struct { + network.ParsedURL + cooldown atomic.Value + serverErrors atomic.Int64 +} + +// httpMsgSender implements a network.MessageSender. +// For NewMessageSender see func (ht *httpnet) NewMessageSender(...) +type httpMsgSender struct { + peer peer.ID + urls []*senderURL + ht *Network + opts network.MessageSenderOpts + closing chan struct{} + closeOnce sync.Once +} + +// For NewMessageSender see func (ht *httpnet) NewMessageSender(...) + +// sortURLS sorts the sender urls as follows: +// - urls with exhausted retries go to the end +// - urls are sorted by cooldown (shorter first) +// - same, or no cooldown, are sorted by number of server errors +func (sender *httpMsgSender) sortURLS() []*senderURL { + if len(sender.urls) <= 1 { + return sender.urls + } + + // sender.urls must be read-only as multiple workers + // attempt to sort it. + urlCopy := make([]*senderURL, len(sender.urls)) + copy(urlCopy, sender.urls) + + slices.SortFunc(urlCopy, func(a, b *senderURL) int { + // urls without exhausted retries come first + serverErrorsA := a.serverErrors.Load() + serverErrorsB := b.serverErrors.Load() + if serverErrorsA >= int64(sender.opts.MaxRetries) { + return 1 // a > b + } + if serverErrorsB >= int64(sender.opts.MaxRetries) { + return -1 // a < b + } + + cooldownA := a.cooldown.Load().(time.Time) + cooldownB := b.cooldown.Load().(time.Time) + dlComp := cooldownA.Compare(cooldownB) + if dlComp != 0 { + return dlComp + } + + return int(serverErrorsA - serverErrorsB) + }) + return urlCopy +} + +// bestURL calls sortURLS are returns the first one of the list that is not in +// the ignore list. The returned senderURL can be nil when no valid URL was +// found (i.e. there are valid urls but they are also in the ignore list). +// An error is only returned when all urls have exceeded maxRetries (abort). +func (sender *httpMsgSender) bestURL(ignore []*senderURL) (*senderURL, error) { + urls := sender.sortURLS() + + ignoreMap := make(map[*senderURL]struct{}, len(ignore)) + for _, ig := range ignore { + ignoreMap[ig] = struct{}{} + } + + var first *senderURL + for _, u := range urls { + if _, ok := ignoreMap[u]; !ok { + first = u + break + } + } + + if first == nil { + return nil, nil + } + + if first.serverErrors.Load() >= int64(sender.opts.MaxRetries) { + return nil, errors.New("urls exceeded server errors") + } + + return first, nil +} + +// sendErrorType explains why a request failed. +type senderErrorType int + +const ( + // Usually errors that require aborting processing a wantlist. + typeFatal senderErrorType = 0 + // Usually errors like 404, etc. which do not signal any issues + // on the server. + typeClient senderErrorType = 1 + // Usually errors that signal issues in the server. + typeServer senderErrorType = 2 + // Usually errors due to cancelled contexts or timeouts. + typeContext senderErrorType = 3 + // Errors due to 429 and 503 (retry later) + typeRetryLater senderErrorType = 4 +) + +// senderError attatches type to a regular error. Implements the Error interface. +type senderError struct { + Type senderErrorType + Err error +} + +// Error returns the underlying error message. +func (err senderError) Error() string { + return err.Err.Error() +} + +// tryURL attemps to make a request to the given URL using the given entry. +// Blocks, Haves etc. are recorded in the given response. cancellations are +// processed. tryURL returns an error so that it can be decided what to do next: +// i.e. retry, or move to next item in wantlist, or abort completely. +func (sender *httpMsgSender) tryURL(ctx context.Context, u *senderURL, entry bsmsg.Entry) (blocks.Block, *senderError) { + // sleep whatever needed + if dl := u.cooldown.Load().(time.Time); !dl.IsZero() { + return nil, &senderError{ + Type: typeRetryLater, + Err: fmt.Errorf("%q is in cooldown period", u.URL), + } + } + + var method string + + switch { + case entry.WantType == pb.Message_Wantlist_Block: + method = "GET" + case entry.WantType == pb.Message_Wantlist_Have: + method = "HEAD" + default: + panic("unknown bitswap entry type") + } + + // We do not abort ongoing requests. This is known to cause "http2: + // server sent GOAWAY and closed the connection" Losing a connection + // is worse than downloading some extra bytes. We do abort if the + // context WAS already cancelled before making the request. + if err := ctx.Err(); err != nil { + log.Debugf("aborted before sending: %s %q", method, u.ParsedURL.URL) + return nil, &senderError{ + Type: typeContext, + Err: err, + } + } + + ctx, cancel := context.WithTimeout(context.Background(), sender.opts.SendTimeout) + defer cancel() + req, err := buildRequest(ctx, u.ParsedURL, method, entry.Cid.String(), sender.ht.userAgent) + if err != nil { + return nil, &senderError{ + Type: typeFatal, + Err: err, + } + } + + log.Debugf("%d/%d %s %q", u.serverErrors.Load(), sender.opts.MaxRetries, method, req.URL) + atomic.AddUint64(&sender.ht.stats.MessagesSent, 1) + sender.ht.metrics.RequestsInFlight.Inc() + resp, err := sender.ht.client.Do(req) + if err != nil { + err = fmt.Errorf("error making request to %q: %w", req.URL, err) + sender.ht.metrics.RequestsFailure.Inc() + sender.ht.metrics.RequestsInFlight.Dec() + log.Debug(err) + // Something prevents us from making a request. We cannot + // dial, or setup the connection perhaps. This counts as + // server error (unless context cancellation). This means we + // allow ourselves to hit this a maximum of MaxRetries per url. + // and Disconnect() the peer when no urls work. + serr := &senderError{ + Type: typeServer, + Err: err, + } + + if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) { + serr.Type = typeContext // cont. with next block. + } + + return nil, serr + } + + // Record request size + var buf bytes.Buffer + req.Write(&buf) + sender.ht.metrics.RequestsSentBytes.Add(float64((&buf).Len())) + + // Handle responses + limReader := &io.LimitedReader{ + R: resp.Body, + N: sender.ht.maxBlockSize, + } + + body, err := io.ReadAll(limReader) + if err != nil { + // treat this as server error + err = fmt.Errorf("error reading body from %q: %w", req.URL, err) + sender.ht.metrics.RequestsBodyFailure.Inc() + sender.ht.metrics.RequestsInFlight.Dec() + log.Debug(err) + return nil, &senderError{ + Type: typeServer, + Err: err, + } + } + + // special cases in response handling. Happens here to simplify + // metrics/handling below. + statusCode := resp.StatusCode + // 1) Observed that some gateway implementation returns 500 instead of + // 404. + if statusCode == 500 && + (string(body) == "ipld: could not find node" || strings.HasPrefix(string(body), "peer does not have")) { + log.Debugf("treating as 404: %q -> %d: %q", req.URL, resp.StatusCode, string(body)) + statusCode = 404 + } + + // Calculate full response size with headers and everything. + // So this is comparable to bitswap message response sizes. + resp.Body = nil + var respBuf bytes.Buffer + resp.Write(&respBuf) + respLen := (&respBuf).Len() + len(body) + + sender.ht.metrics.ResponseSizes.Observe(float64(respLen)) + sender.ht.metrics.RequestsInFlight.Dec() + host, _, _ := net.SplitHostPort(u.URL.Host) + // updateStatusCounter + sender.ht.metrics.updateStatusCounter(req.Method, statusCode, host) + + switch statusCode { + // Valid responses signaling unavailability of the + // content. + case http.StatusNotFound, + http.StatusGone, + http.StatusForbidden, + http.StatusUnavailableForLegalReasons: + + err := fmt.Errorf("%s %q -> %d: %q", req.Method, req.URL, statusCode, string(body)) + log.Debug(err) + // clear cooldowns since we got a proper reply + if !u.cooldown.Load().(time.Time).IsZero() { + sender.ht.cooldownTracker.remove(req.URL.Host) + u.cooldown.Store(time.Time{}) + } + + return nil, &senderError{ + Type: typeClient, + Err: err, + } + case http.StatusOK: // \(^°^)/ + // clear cooldowns since we got a proper reply + if !u.cooldown.Load().(time.Time).IsZero() { + sender.ht.cooldownTracker.remove(req.URL.Host) + u.cooldown.Store(time.Time{}) + } + log.Debugf("%s %q -> %d (%d bytes)", req.Method, req.URL, statusCode, len(body)) + + if req.Method == "HEAD" { + return nil, nil + } + // GET + b, err := blocks.NewBlockWithCid(body, entry.Cid) + if err != nil { + log.Error("block received for cid %s does not match!", entry.Cid) + // avoid entertaining servers that send us wrong data + // too much. + return nil, &senderError{ + Type: typeServer, + Err: err, + } + } + atomic.AddUint64(&sender.ht.stats.MessagesRecvd, 1) + return b, nil + + case http.StatusTooManyRequests, + http.StatusServiceUnavailable, + http.StatusBadGateway, + http.StatusGatewayTimeout: + // See path-gateway spec. All these codes SHOULD return + // Retry-After. They are used to signal that a block cannot + // be fetched too, not only fatal server issues, which poses a + // difficult overlap. Current approach treats these errors as + // non fatal if they don't happen repeteadly: + // - By default we disconnect on server errors: MaxRetries = 1. + // - First try errors. We add default backoff if non specified. + // - Retry same CID. If it fails again, count that as server + // error and avoid retrying on that url. + // - If we have no more urls to try, will move to next cid. + // - If we hit the MaxRetries for all urls, abort all. + + // In practice, our wantlists should be 1/3 elements. It + // doesn't make sense to tolerate 5 server errors for 3 + // requests as we will repeteadly hit broken servers that way. + // It is always better if endpoints keep these errors for + // server issues, and simply return 404 when they cannot find + // the content but everything else is fine. + err := fmt.Errorf("%q -> %d: %q", req.URL, statusCode, string(body)) + log.Error(err) + retryAfter := resp.Header.Get("Retry-After") + cooldownUntil, ok := parseRetryAfter(retryAfter) + if ok { // it means we should retry, so we will retry. + sender.ht.cooldownTracker.setByDate(req.URL.Host, cooldownUntil) + u.cooldown.Store(cooldownUntil) + } else { + sender.ht.cooldownTracker.setByDuration(req.URL.Host, sender.opts.SendErrorBackoff) + u.cooldown.Store(time.Now().Add(sender.opts.SendErrorBackoff)) + } + + return nil, &senderError{ + Type: typeRetryLater, + Err: err, + } + + // For any other code, we assume we must temporally + // backoff from the URL per the options. + // Tolerance for server errors per url is low. If after waiting etc. + // it fails MaxRetries, we will fully disconnect. + default: + err := fmt.Errorf("%q -> %d: %q", req.URL, statusCode, string(body)) + log.Error(err) + sender.ht.cooldownTracker.setByDuration(req.URL.Host, sender.opts.SendErrorBackoff) + u.cooldown.Store(time.Now().Add(sender.opts.SendErrorBackoff)) + return nil, &senderError{ + Type: typeServer, + Err: err, + } + } +} + +// SendMsg performs an http request for the wanted cids per the msg's +// Wantlist. It reads the response and records it in a reponse BitswapMessage +// which is forwarded to the receivers (in a separate goroutine). +func (sender *httpMsgSender) SendMsg(ctx context.Context, msg bsmsg.BitSwapMessage) error { + // SendMsg gets called from MessageQueue and returning an error + // results in a MessageQueue shutdown. Errors are only returned when + // we are unable to obtain a single valid Block/Has response. When a + // URL errors in a bad way (connection, 500s), we continue checking + // with the next available one. + + // unless we have a wantlist, we bailout. + wantlist := msg.Wantlist() + lenWantlist := len(wantlist) + if lenWantlist == 0 { + return nil + } + + // Keep metrics of wantlists sent and how long it took + sender.ht.metrics.WantlistsTotal.Inc() + sender.ht.metrics.WantlistsItemsTotal.Add(float64(lenWantlist)) + log.Debugf("sending wantlist: %s (%d items)", sender.peer, lenWantlist) + now := time.Now() + defer func() { + sender.ht.metrics.WantlistsSeconds.Observe(float64(time.Since(now)) / float64(time.Second)) + }() + + // This is mostly a cosmetic action since the bitswap.Client is just + // logging the errors. + sendErrors := func(err error) { + if err != nil { + for _, recv := range sender.ht.receivers { + recv.ReceiveError(err) + } + } + } + + var err error + + // obtain contexts for all the entries in the wantlits. This allows + // us to react when cancels arrive to wantlists that we are going + // through. We use a Background context because requests will be + // ongoing when we return and the parent context is cancelled. + parentCtx := context.Background() + entryCtxs := make([]context.Context, len(wantlist)) + entryCancels := make([]context.CancelFunc, len(wantlist)) + nop := func() {} + for i, entry := range wantlist { + if entry.Cancel { + entryCtxs[i] = ctx + entryCancels[i] = nop + } else { + entryCtxs[i], entryCancels[i] = sender.ht.requestTracker.requestContext(parentCtx, entry.Cid) + } + } + + resultsCollector := make(chan httpResult, len(wantlist)) + + totalSent := 0 + +WANTLIST_LOOP: + for i, entry := range wantlist { + if entry.Cancel { // shortcut cancel entries. + sender.ht.requestTracker.cancelRequest(entry.Cid) + sender.ht.metrics.updateStatusCounter("CANCEL", 0, "") + // Do not observe request time for cancel requests as + // they cost us nothing, so it is unfair to compare + // against bsnet requests-time. + // sender.ht.metrics.RequestTime.Observe(float64(time.Since(reqStart)) + // / float64(time.Second)) + continue + } + log.Debugf("wantlist msg %d/%d: %s %s %s DH:%t", i, lenWantlist, sender.peer, entry.Cid, entry.WantType, entry.SendDontHave) + + reqInfo := httpRequestInfo{ + ctx: entryCtxs[i], + sender: sender, + entry: entry, + result: resultsCollector, + startTime: time.Now(), + } + + select { + case <-ctx.Done(): + // our context cancelled so we must abort. + err = ctx.Err() + break WANTLIST_LOOP + case sender.ht.httpRequests <- reqInfo: + totalSent++ + } + } + + if totalSent == 0 { + return nil + } + + // We are finished sending. Like bitswap/bsnet, we return. + // Receiving results is async and we leave a goroutine taking care of + // that. + go func() { + bsresp := bsmsg.New(false) + totalResponses := 0 + + for result := range resultsCollector { + // Record total request time. + sender.ht.metrics.RequestTime.Observe(float64(time.Since(result.info.startTime)) / float64(time.Second)) + + entry := result.info.entry + + if result.err == nil { + sender.ht.connEvtMgr.OnMessage(sender.peer) + + if entry.WantType == pb.Message_Wantlist_Block { + bsresp.AddBlock(result.block) + } else { + bsresp.AddHave(entry.Cid) + } + } else { + + // error handling + switch result.err.Type { + case typeFatal: + log.Errorf("Disconnecting from %s: %s", sender.peer, result.err.Err) + sender.ht.DisconnectFrom(ctx, sender.peer) + err = result.err + // continue processing responses as workers + // might have done other requests in parallel + case typeClient: + if entry.SendDontHave { + bsresp.AddDontHave(entry.Cid) + } + case typeContext: // ignore and move on + case typeServer: // should not be returned as retried until fatal + default: + panic("unexpected returned error type") + } + } + // Leave loop when we read all what we + // expected. + totalResponses++ + if totalResponses >= totalSent { + close(resultsCollector) + break + } + } + + // We return a special "cancel" function that we need to call + // explicitally. This cleans up our request-tracker. + for _, cancel := range entryCancels { + cancel() + } + sender.ht.requestTracker.cleanEmptyRequests(wantlist) + sender.notifyReceivers(bsresp) + // This just logs errors apparently. + sendErrors(err) + }() + + // We never return error once we started sending. Whatever happened, + // we will be cooling down urls etc. but we don't need to disconnect + // or report that "peer is down" for the moment, as we disconnect + // manually on error. + return nil +} + +func (sender *httpMsgSender) notifyReceivers(bsresp bsmsg.BitSwapMessage) { + lb := len(bsresp.Blocks()) + lh := len(bsresp.Haves()) + ldh := len(bsresp.DontHaves()) + if lb+lh+ldh == 0 { // nothing to do + return + } + + for i, recv := range sender.ht.receivers { + log.Debugf("ReceiveMessage from %s#%d. Blocks: %d. Haves: %d", sender.peer, i, lb, lh) + recv.ReceiveMessage( + context.Background(), + sender.peer, + bsresp, + ) + } +} + +// Reset resets the sender (currently noop) +func (sender *httpMsgSender) Reset() error { + sender.closeOnce.Do(func() { + close(sender.closing) + }) + return nil +} + +// SupportsHave indicates whether the peer answers to HEAD requests. +// This has been probed during Connect(). +func (sender *httpMsgSender) SupportsHave() bool { + return supportsHave(sender.ht.host.Peerstore(), sender.peer) +} + +func supportsHave(pstore peerstore.Peerstore, p peer.ID) bool { + var haveSupport bool + v, err := pstore.Get(p, peerstoreSupportsHeadKey) + if err != nil { + haveSupport = false + } else { + b, ok := v.(bool) + haveSupport = ok && b + } + log.Debugf("supportsHave: %s %t", p, haveSupport) + return haveSupport +} + +// parseRetryAfter returns how many seconds the Retry-After header header +// wants us to wait. +func parseRetryAfter(ra string) (time.Time, bool) { + if len(ra) == 0 { + return time.Time{}, false + } + secs, err := strconv.ParseInt(ra, 10, 64) + if err != nil { + date, err := time.Parse(time.RFC1123, ra) + if err != nil { + return time.Time{}, false + } + return date, true + } + if secs <= 0 { + return time.Time{}, false + } + + return time.Now().Add(time.Duration(secs) * time.Second), true +} diff --git a/bitswap/network/httpnet/pinger.go b/bitswap/network/httpnet/pinger.go new file mode 100644 index 000000000..67ce832e1 --- /dev/null +++ b/bitswap/network/httpnet/pinger.go @@ -0,0 +1,164 @@ +package httpnet + +import ( + "context" + "sync" + "time" + + "github.com/ipfs/boxo/bitswap/network" + "github.com/libp2p/go-libp2p/core/peer" + "github.com/libp2p/go-libp2p/p2p/protocol/ping" + "go.uber.org/multierr" +) + +// pinger pings connected hosts on regular intervals +// and tracks their latency. +type pinger struct { + ht *Network + + latenciesLock sync.RWMutex + latencies map[peer.ID]time.Duration + + pingsLock sync.Mutex + pings map[peer.ID]context.CancelFunc +} + +func newPinger(ht *Network, pingCid string) *pinger { + return &pinger{ + ht: ht, + latencies: make(map[peer.ID]time.Duration), + pings: make(map[peer.ID]context.CancelFunc), + } +} + +// ping sends a ping packet to the first known url of the given peer and +// returns the result with the latency for this peer. The result is also +// recorded. +func (pngr *pinger) ping(ctx context.Context, p peer.ID) ping.Result { + pi := pngr.ht.host.Peerstore().PeerInfo(p) + urls := network.ExtractURLsFromPeer(pi) + if len(urls) == 0 { + return ping.Result{ + Error: ErrNoHTTPAddresses, + } + } + + method := "GET" + if supportsHave(pngr.ht.host.Peerstore(), p) { + method = "HEAD" + } + + results := make(chan ping.Result, len(urls)) + for _, u := range urls { + go func(u network.ParsedURL) { + start := time.Now() + err := pngr.ht.connect(ctx, p, u, method) + if err != nil { + log.Debug(err) + results <- ping.Result{Error: err} + return + } + results <- ping.Result{ + RTT: time.Since(start), + } + }(u) + } + + var result ping.Result + var errors error + for i := 0; i < len(urls); i++ { + r := <-results + if r.Error != nil { + errors = multierr.Append(errors, r.Error) + continue + } + result.RTT += r.RTT + } + close(results) + + lenErrors := len(multierr.Errors(errors)) + // if all urls failed return that, otherwise ignore. + if lenErrors == len(urls) { + return ping.Result{ + Error: errors, + } + } + result.RTT = result.RTT / time.Duration(len(urls)-lenErrors) + + //log.Debugf("ping latency %s %s", p, result.RTT) + pngr.recordLatency(p, result.RTT) + return result +} + +// latency returns the recorded latency for the given peer. +func (pngr *pinger) latency(p peer.ID) time.Duration { + var lat time.Duration + pngr.latenciesLock.RLock() + { + lat = pngr.latencies[p] + } + pngr.latenciesLock.RUnlock() + return lat +} + +// recordLatency stores a new latency measurement for the given peer using an +// Exponetially Weighted Moving Average similar to LatencyEWMA from the +// peerstore. +func (pngr *pinger) recordLatency(p peer.ID, next time.Duration) { + nextf := float64(next) + s := 0.1 + pngr.latenciesLock.Lock() + { + ewma, found := pngr.latencies[p] + ewmaf := float64(ewma) + if !found { + pngr.latencies[p] = next // when no data, just take it as the mean. + } else { + nextf = ((1.0 - s) * ewmaf) + (s * nextf) + pngr.latencies[p] = time.Duration(nextf) + } + } + pngr.latenciesLock.Unlock() +} + +func (pngr *pinger) startPinging(p peer.ID) { + pngr.pingsLock.Lock() + defer pngr.pingsLock.Unlock() + + _, ok := pngr.pings[p] + if ok { + return + } + + ctx, cancel := context.WithCancel(context.Background()) + pngr.pings[p] = cancel + + go func(ctx context.Context, p peer.ID) { + ticker := time.NewTicker(5 * time.Second) + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + pngr.ping(ctx, p) + } + } + }(ctx, p) + +} + +func (pngr *pinger) stopPinging(p peer.ID) { + pngr.pingsLock.Lock() + { + cancel, ok := pngr.pings[p] + if ok { + cancel() + } + delete(pngr.pings, p) + } + pngr.pingsLock.Unlock() + pngr.latenciesLock.Lock() + delete(pngr.latencies, p) + pngr.latenciesLock.Unlock() + +} diff --git a/bitswap/network/httpnet/request_tracker.go b/bitswap/network/httpnet/request_tracker.go new file mode 100644 index 000000000..f7b92e4ff --- /dev/null +++ b/bitswap/network/httpnet/request_tracker.go @@ -0,0 +1,111 @@ +package httpnet + +import ( + "context" + "sync" + + bsmsg "github.com/ipfs/boxo/bitswap/message" + "github.com/ipfs/go-cid" +) + +// requestTracker tracks requests to CIDs so that we can cancel all ongoing +// requests to a single CID. +type requestTracker struct { + mux sync.Mutex + ctxs map[cid.Cid]*ctxMap +} + +type ctxMap struct { + mux sync.RWMutex + m map[context.Context]context.CancelFunc +} + +// newRequestTracker creates a new requestTracker. A request tracker provides +// a context for a CID-request. All contexts for a given CID can be +// cancelled at once with cancelRequest(). +func newRequestTracker() *requestTracker { + return &requestTracker{ + ctxs: make(map[cid.Cid]*ctxMap), + } +} + +// requestContext returns a new context to make a request for a cid. The +// context will be cancelled if cancelRequest() is called for the same +// CID. +func (rc *requestTracker) requestContext(ctx context.Context, c cid.Cid) (context.Context, context.CancelFunc) { + var cidCtxs *ctxMap + var ok bool + rc.mux.Lock() + { + cidCtxs, ok = rc.ctxs[c] + if !ok { + cidCtxs = &ctxMap{m: make(map[context.Context]context.CancelFunc)} + rc.ctxs[c] = cidCtxs + } + } + rc.mux.Unlock() + + // derive the context we will return + rCtx, rCancel := context.WithCancel(ctx) + + // store it + cidCtxs.mux.Lock() + { + cidCtxs.m[rCtx] = rCancel + } + cidCtxs.mux.Unlock() + // Special cancel function to clean up entry + return rCtx, func() { + rCancel() + cidCtxs.mux.Lock() + delete(cidCtxs.m, rCtx) + cidCtxs.mux.Unlock() + } +} + +// cancelRequest cancels all contexts obtained via requestContext for the +// given CID. +func (rc *requestTracker) cancelRequest(c cid.Cid) { + var cidCtxs *ctxMap + var ok bool + rc.mux.Lock() + { + cidCtxs, ok = rc.ctxs[c] + delete(rc.ctxs, c) + } + rc.mux.Unlock() + + if !ok { + return + } + + log.Debugf("cancelling all requests for %s", c) + cidCtxs.mux.Lock() + { + for _, cancel := range cidCtxs.m { + cancel() + } + } + cidCtxs.mux.Unlock() +} + +// cleanEmptyRequests uses a single lock to perform tracker-cleanup for a +// given list wantlist when no contexts for the entry CID exist. +func (rc *requestTracker) cleanEmptyRequests(wantlist []bsmsg.Entry) { + rc.mux.Lock() + { + for _, e := range wantlist { + cidCtxs, ok := rc.ctxs[e.Cid] + if !ok { + continue + } + cidCtxs.mux.RLock() + if len(cidCtxs.m) == 0 { + delete(rc.ctxs, e.Cid) + } + cidCtxs.mux.RUnlock() + + } + } + rc.mux.Unlock() +} diff --git a/bitswap/network/interface.go b/bitswap/network/interface.go index eef4a52b2..89ec990f5 100644 --- a/bitswap/network/interface.go +++ b/bitswap/network/interface.go @@ -5,29 +5,16 @@ import ( "time" bsmsg "github.com/ipfs/boxo/bitswap/message" - "github.com/ipfs/boxo/bitswap/network/internal" + cid "github.com/ipfs/go-cid" - "github.com/libp2p/go-libp2p/core/connmgr" + "github.com/libp2p/go-libp2p/core/peer" "github.com/libp2p/go-libp2p/core/routing" "github.com/libp2p/go-libp2p/p2p/protocol/ping" ) -var ( - // ProtocolBitswapNoVers is equivalent to the legacy bitswap protocol - ProtocolBitswapNoVers = internal.ProtocolBitswapNoVers - // ProtocolBitswapOneZero is the prefix for the legacy bitswap protocol - ProtocolBitswapOneZero = internal.ProtocolBitswapOneZero - // ProtocolBitswapOneOne is the prefix for version 1.1.0 - ProtocolBitswapOneOne = internal.ProtocolBitswapOneOne - // ProtocolBitswap is the current version of the bitswap protocol: 1.2.0 - ProtocolBitswap = internal.ProtocolBitswap -) - // BitSwapNetwork provides network connectivity for BitSwap sessions. type BitSwapNetwork interface { - Self() peer.ID - // SendMessage sends a BitSwap message to a peer. SendMessage( context.Context, @@ -44,18 +31,25 @@ type BitSwapNetwork interface { NewMessageSender(context.Context, peer.ID, *MessageSenderOpts) (MessageSender, error) - ConnectionManager() connmgr.ConnManager - Stats() Stats + Self() peer.ID Pinger + PeerTagger +} + +// PeerTagger is an interface for tagging peers with metadata +type PeerTagger interface { + TagPeer(peer.ID, string, int) + UntagPeer(peer.ID, string) + Protect(peer.ID, string) + Unprotect(peer.ID, string) bool } // MessageSender is an interface for sending a series of messages over the bitswap // network type MessageSender interface { SendMsg(context.Context, bsmsg.BitSwapMessage) error - Close() error Reset() error // Indicates whether the remote peer supports HAVE / DONT_HAVE messages SupportsHave() bool diff --git a/bitswap/network/router.go b/bitswap/network/router.go new file mode 100644 index 000000000..55fe855f2 --- /dev/null +++ b/bitswap/network/router.go @@ -0,0 +1,207 @@ +package network + +import ( + "context" + "time" + + bsmsg "github.com/ipfs/boxo/bitswap/message" + "github.com/libp2p/go-libp2p/core/peer" + "github.com/libp2p/go-libp2p/core/peerstore" + "github.com/libp2p/go-libp2p/p2p/protocol/ping" +) + +type router struct { + Bitswap BitSwapNetwork + HTTP BitSwapNetwork + Peerstore peerstore.Peerstore +} + +// New returns a BitSwapNetwork supported by underlying IPFS host. +func New(pstore peerstore.Peerstore, bitswap BitSwapNetwork, http BitSwapNetwork) BitSwapNetwork { + if bitswap == nil && http == nil { + panic("bad exchange network router initialization: need bitswap or http") + } + + if http == nil { + return bitswap + } + + if bitswap == nil { + return http + } + + if http.Self() != bitswap.Self() { + panic("http and bitswap network report different peer IDs") + } + + return &router{ + Peerstore: pstore, + Bitswap: bitswap, + HTTP: http, + } +} + +func (rt *router) Start(receivers ...Receiver) { + // This creates two connectionEventManagers. A connection manager has + // the power to remove a peer from everywhere on a Disconnect() event, + // so we need to be careful so that the Bitswap connection manager + // does not step in when we are using HTTP and vice-versa. + rt.Bitswap.Start(receivers...) + rt.HTTP.Start(receivers...) +} + +func (rt *router) Stop() { + rt.Bitswap.Stop() + rt.HTTP.Stop() +} + +// Self returns the peer ID of the network. +func (rt *router) Self() peer.ID { + // Self is used on the bitswap server, + // and on the client on the message queue + // and session manager. + // We ensure during initialization that we are using + // the same host for both networks. + return rt.Bitswap.Self() +} + +func (rt *router) Ping(ctx context.Context, p peer.ID) ping.Result { + pi := rt.Peerstore.PeerInfo(p) + htaddrs, _ := SplitHTTPAddrs(pi) + if len(htaddrs.Addrs) > 0 { + return rt.HTTP.Ping(ctx, p) + } + return rt.Bitswap.Ping(ctx, p) +} + +func (rt *router) Latency(p peer.ID) time.Duration { + pi := rt.Peerstore.PeerInfo(p) + htaddrs, _ := SplitHTTPAddrs(pi) + if len(htaddrs.Addrs) > 0 { + return rt.HTTP.Latency(p) + } + return rt.Bitswap.Latency(p) +} + +func (rt *router) SendMessage(ctx context.Context, p peer.ID, msg bsmsg.BitSwapMessage) error { + // SendMessage is only used by bitswap server on sendBlocks(). We + // should not be passing a router to the bitswap server but we try to + // make our best. + + // If the message has blocks, send it via bitswap. + if len(msg.Blocks()) > 0 { + return rt.Bitswap.SendMessage(ctx, p, msg) + } + + // Otherwise, assume it's a wantlist. Follow usual prioritization + // of HTTP when possible. + pi := rt.Peerstore.PeerInfo(p) + htaddrs, _ := SplitHTTPAddrs(pi) + if len(htaddrs.Addrs) > 0 { + return rt.HTTP.SendMessage(ctx, p, msg) + } + return rt.Bitswap.SendMessage(ctx, p, msg) +} + +// Connect attempts to connect to a peer. It prioritizes HTTP connections over +// bitswap, but does a bitswap connect if HTTP Connect fails (i.e. when there +// are no HTTP addresses. +func (rt *router) Connect(ctx context.Context, p peer.AddrInfo) error { + htaddrs, bsaddrs := SplitHTTPAddrs(p) + if len(htaddrs.Addrs) == 0 { + return rt.Bitswap.Connect(ctx, bsaddrs) + } + + err := rt.HTTP.Connect(ctx, htaddrs) + if err != nil { + return rt.Bitswap.Connect(ctx, bsaddrs) + } + return nil +} + +func (rt *router) DisconnectFrom(ctx context.Context, p peer.ID) error { + // DisconnectFrom is only called from bitswap.Server, on failures + // receiving a bitswap message. On HTTP, we don't "disconnect" unless + // there are retrieval failures, which we handle internally. + // + // Result: only disconnect bitswap, when there are bitswap addresses + // involved. + pi := rt.Peerstore.PeerInfo(p) + _, bsaddrs := SplitHTTPAddrs(pi) + if len(bsaddrs.Addrs) > 0 { + return rt.Bitswap.DisconnectFrom(ctx, p) + } + return nil +} + +func (rt *router) Stats() Stats { + htstats := rt.HTTP.Stats() + bsstats := rt.Bitswap.Stats() + return Stats{ + MessagesRecvd: htstats.MessagesRecvd + bsstats.MessagesRecvd, + MessagesSent: htstats.MessagesSent + bsstats.MessagesSent, + } +} + +// NewMessageSender returns a MessageSender using the HTTP network when HTTP +// addresses are known, and bitswap otherwise. +func (rt *router) NewMessageSender(ctx context.Context, p peer.ID, opts *MessageSenderOpts) (MessageSender, error) { + // IF we did not manage to connect to any HTTP address beforehand, we + // should not have them in the peerstore. + pi := rt.Peerstore.PeerInfo(p) + htaddrs, _ := SplitHTTPAddrs(pi) + if len(htaddrs.Addrs) > 0 { + return rt.HTTP.NewMessageSender(ctx, p, opts) + } + return rt.Bitswap.NewMessageSender(ctx, p, opts) +} + +func (rt *router) TagPeer(p peer.ID, tag string, w int) { + // tag once only if they are the same. + if rt.HTTP.Self() == rt.Bitswap.Self() { + rt.HTTP.TagPeer(p, tag, w) + return + } + + pi := rt.Peerstore.PeerInfo(p) + htaddrs, _ := SplitHTTPAddrs(pi) + if len(htaddrs.Addrs) > 0 { + rt.HTTP.TagPeer(p, tag, w) + return + } + rt.Bitswap.TagPeer(p, tag, w) +} + +func (rt *router) UntagPeer(p peer.ID, tag string) { + // tag once only if they are the same. + if rt.HTTP.Self() == rt.Bitswap.Self() { + rt.HTTP.UntagPeer(p, tag) + return + } + + pi := rt.Peerstore.PeerInfo(p) + htaddrs, _ := SplitHTTPAddrs(pi) + if len(htaddrs.Addrs) > 0 { + rt.HTTP.UntagPeer(p, tag) + return + } + rt.Bitswap.UntagPeer(p, tag) +} + +func (rt *router) Protect(p peer.ID, tag string) { + pi := rt.Peerstore.PeerInfo(p) + htaddrs, _ := SplitHTTPAddrs(pi) + if len(htaddrs.Addrs) > 0 { + rt.HTTP.Protect(p, tag) + return + } + rt.Bitswap.Protect(p, tag) +} +func (rt *router) Unprotect(p peer.ID, tag string) bool { + pi := rt.Peerstore.PeerInfo(p) + htaddrs, _ := SplitHTTPAddrs(pi) + if len(htaddrs.Addrs) > 0 { + return rt.HTTP.Unprotect(p, tag) + } + return rt.Bitswap.Unprotect(p, tag) +} diff --git a/bitswap/server/server.go b/bitswap/server/server.go index caaf55501..9245751b1 100644 --- a/bitswap/server/server.go +++ b/bitswap/server/server.go @@ -81,7 +81,7 @@ func New(ctx context.Context, network bsnet.BitSwapNetwork, bstore blockstore.Bl s.engine = decision.NewEngine( ctx, bstore, - network.ConnectionManager(), + network, network.Self(), s.engineOptions..., ) diff --git a/bitswap/testinstance/testinstance.go b/bitswap/testinstance/testinstance.go index f09831b65..fae877623 100644 --- a/bitswap/testinstance/testinstance.go +++ b/bitswap/testinstance/testinstance.go @@ -5,7 +5,8 @@ import ( "time" "github.com/ipfs/boxo/bitswap" - bsnet "github.com/ipfs/boxo/bitswap/network" + iface "github.com/ipfs/boxo/bitswap/network" + bsnet "github.com/ipfs/boxo/bitswap/network/bsnet" tn "github.com/ipfs/boxo/bitswap/testnet" blockstore "github.com/ipfs/boxo/blockstore" mockrouting "github.com/ipfs/boxo/routing/mock" @@ -92,7 +93,7 @@ type Instance struct { Datastore ds.Batching Exchange *bitswap.Bitswap Blockstore blockstore.Blockstore - Adapter bsnet.BitSwapNetwork + Adapter iface.BitSwapNetwork Routing routing.Routing blockstoreDelay delay.D } diff --git a/bitswap/testnet/interface.go b/bitswap/testnet/interface.go index 4320c8d90..224b449cf 100644 --- a/bitswap/testnet/interface.go +++ b/bitswap/testnet/interface.go @@ -1,7 +1,8 @@ package bitswap import ( - bsnet "github.com/ipfs/boxo/bitswap/network" + iface "github.com/ipfs/boxo/bitswap/network" + bsnet "github.com/ipfs/boxo/bitswap/network/bsnet" tnet "github.com/libp2p/go-libp2p-testing/net" "github.com/libp2p/go-libp2p/core/peer" ) @@ -9,7 +10,7 @@ import ( // Network is an interface for generating bitswap network interfaces // based on a test network. type Network interface { - Adapter(tnet.Identity, ...bsnet.NetOpt) bsnet.BitSwapNetwork + Adapter(tnet.Identity, ...bsnet.NetOpt) iface.BitSwapNetwork HasPeer(peer.ID) bool } diff --git a/bitswap/testnet/peernet.go b/bitswap/testnet/peernet.go index 01608f755..6ae7a0aad 100644 --- a/bitswap/testnet/peernet.go +++ b/bitswap/testnet/peernet.go @@ -3,7 +3,8 @@ package bitswap import ( "context" - bsnet "github.com/ipfs/boxo/bitswap/network" + iface "github.com/ipfs/boxo/bitswap/network" + bsnet "github.com/ipfs/boxo/bitswap/network/bsnet" tnet "github.com/libp2p/go-libp2p-testing/net" "github.com/libp2p/go-libp2p/core/peer" mockpeernet "github.com/libp2p/go-libp2p/p2p/net/mock" @@ -18,7 +19,7 @@ func StreamNet(ctx context.Context, net mockpeernet.Mocknet) (Network, error) { return &peernet{net}, nil } -func (pn *peernet) Adapter(p tnet.Identity, opts ...bsnet.NetOpt) bsnet.BitSwapNetwork { +func (pn *peernet) Adapter(p tnet.Identity, opts ...bsnet.NetOpt) iface.BitSwapNetwork { client, err := pn.Mocknet.AddPeer(p.PrivateKey(), p.Address()) if err != nil { panic(err.Error()) diff --git a/bitswap/testnet/virtual.go b/bitswap/testnet/virtual.go index 442ed00f1..556529acf 100644 --- a/bitswap/testnet/virtual.go +++ b/bitswap/testnet/virtual.go @@ -10,7 +10,8 @@ import ( "github.com/gammazero/deque" bsmsg "github.com/ipfs/boxo/bitswap/message" - bsnet "github.com/ipfs/boxo/bitswap/network" + iface "github.com/ipfs/boxo/bitswap/network" + bsnet "github.com/ipfs/boxo/bitswap/network/bsnet" delay "github.com/ipfs/go-ipfs-delay" tnet "github.com/libp2p/go-libp2p-testing/net" "github.com/libp2p/go-libp2p/core/connmgr" @@ -80,7 +81,7 @@ type receiverQueue struct { lk sync.Mutex } -func (n *network) Adapter(p tnet.Identity, opts ...bsnet.NetOpt) bsnet.BitSwapNetwork { +func (n *network) Adapter(p tnet.Identity, opts ...bsnet.NetOpt) iface.BitSwapNetwork { n.mu.Lock() defer n.mu.Unlock() @@ -176,14 +177,14 @@ func (n *network) SendMessage( return nil } -var _ bsnet.Receiver = (*networkClient)(nil) +var _ iface.Receiver = (*networkClient)(nil) type networkClient struct { // These need to be at the top of the struct (allocated on the heap) for alignment on 32bit platforms. - stats bsnet.Stats + stats iface.Stats local peer.ID - receivers []bsnet.Receiver + receivers []iface.Receiver network *network supportedProtocols []protocol.ID } @@ -238,8 +239,8 @@ func (nc *networkClient) SendMessage( return nil } -func (nc *networkClient) Stats() bsnet.Stats { - return bsnet.Stats{ +func (nc *networkClient) Stats() iface.Stats { + return iface.Stats{ MessagesRecvd: atomic.LoadUint64(&nc.stats.MessagesRecvd), MessagesSent: atomic.LoadUint64(&nc.stats.MessagesSent), } @@ -284,7 +285,7 @@ func (mp *messagePasser) SupportsHave() bool { return false } -func (nc *networkClient) NewMessageSender(ctx context.Context, p peer.ID, opts *bsnet.MessageSenderOpts) (bsnet.MessageSender, error) { +func (nc *networkClient) NewMessageSender(ctx context.Context, p peer.ID, opts *iface.MessageSenderOpts) (iface.MessageSender, error) { return &messagePasser{ net: nc, target: p, @@ -293,7 +294,7 @@ func (nc *networkClient) NewMessageSender(ctx context.Context, p peer.ID, opts * }, nil } -func (nc *networkClient) Start(r ...bsnet.Receiver) { +func (nc *networkClient) Start(r ...iface.Receiver) { nc.receivers = r } @@ -343,6 +344,19 @@ func (nc *networkClient) DisconnectFrom(_ context.Context, p peer.ID) error { return nil } +func (bsnet *networkClient) TagPeer(p peer.ID, tag string, w int) { +} + +func (bsnet *networkClient) UntagPeer(p peer.ID, tag string) { +} + +func (bsnet *networkClient) Protect(p peer.ID, tag string) { +} + +func (bsnet *networkClient) Unprotect(p peer.ID, tag string) bool { + return false +} + func (rq *receiverQueue) enqueue(m *message) { rq.lk.Lock() defer rq.lk.Unlock() diff --git a/examples/bitswap-transfer/main.go b/examples/bitswap-transfer/main.go index fc2d5ded3..b155e971f 100644 --- a/examples/bitswap-transfer/main.go +++ b/examples/bitswap-transfer/main.go @@ -34,7 +34,7 @@ import ( uih "github.com/ipfs/boxo/ipld/unixfs/importer/helpers" bsclient "github.com/ipfs/boxo/bitswap/client" - bsnet "github.com/ipfs/boxo/bitswap/network" + bsnet "github.com/ipfs/boxo/bitswap/network/bsnet" bsserver "github.com/ipfs/boxo/bitswap/server" "github.com/ipfs/boxo/files" ) diff --git a/examples/go.mod b/examples/go.mod index e76b6a1f4..db0ffbdd9 100644 --- a/examples/go.mod +++ b/examples/go.mod @@ -12,7 +12,7 @@ require ( github.com/libp2p/go-libp2p v0.40.0 github.com/multiformats/go-multiaddr v0.14.0 github.com/multiformats/go-multicodec v0.9.0 - github.com/prometheus/client_golang v1.20.5 + github.com/prometheus/client_golang v1.21.0 github.com/stretchr/testify v1.10.0 go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.56.0 go.opentelemetry.io/contrib/propagators/autoprop v0.46.1 @@ -75,7 +75,7 @@ require ( github.com/ipfs/go-log v1.0.5 // indirect github.com/ipfs/go-log/v2 v2.5.1 // indirect github.com/ipfs/go-merkledag v0.11.0 // indirect - github.com/ipfs/go-metrics-interface v0.0.1 // indirect + github.com/ipfs/go-metrics-interface v0.3.0 // indirect github.com/ipfs/go-peertaskqueue v0.8.2 // indirect github.com/ipfs/go-unixfsnode v1.9.2 // indirect github.com/ipfs/go-verifcid v0.0.3 // indirect diff --git a/examples/go.sum b/examples/go.sum index 7fa56b5d8..d2decfdd5 100644 --- a/examples/go.sum +++ b/examples/go.sum @@ -213,8 +213,8 @@ github.com/ipfs/go-log/v2 v2.5.1 h1:1XdUzF7048prq4aBjDQQ4SL5RxftpRGdXhNRwKSAlcY= github.com/ipfs/go-log/v2 v2.5.1/go.mod h1:prSpmC1Gpllc9UYWxDiZDreBYw7zp4Iqp1kOLU9U5UI= github.com/ipfs/go-merkledag v0.11.0 h1:DgzwK5hprESOzS4O1t/wi6JDpyVQdvm9Bs59N/jqfBY= github.com/ipfs/go-merkledag v0.11.0/go.mod h1:Q4f/1ezvBiJV0YCIXvt51W/9/kqJGH4I1LsA7+djsM4= -github.com/ipfs/go-metrics-interface v0.0.1 h1:j+cpbjYvu4R8zbleSs36gvB7jR+wsL2fGD6n0jO4kdg= -github.com/ipfs/go-metrics-interface v0.0.1/go.mod h1:6s6euYU4zowdslK0GKHmqaIZ3j/b/tL7HTWtJ4VPgWY= +github.com/ipfs/go-metrics-interface v0.3.0 h1:YwG7/Cy4R94mYDUuwsBfeziJCVm9pBMJ6q/JR9V40TU= +github.com/ipfs/go-metrics-interface v0.3.0/go.mod h1:OxxQjZDGocXVdyTPocns6cOLwHieqej/jos7H4POwoY= github.com/ipfs/go-peertaskqueue v0.8.2 h1:PaHFRaVFdxQk1Qo3OKiHPYjmmusQy7gKQUaL8JDszAU= github.com/ipfs/go-peertaskqueue v0.8.2/go.mod h1:L6QPvou0346c2qPJNiJa6BvOibxDfaiPlqHInmzg0FA= github.com/ipfs/go-test v0.0.4 h1:DKT66T6GBB6PsDFLoO56QZPrOmzJkqU1FZH5C9ySkew= @@ -415,8 +415,8 @@ github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZN github.com/polydawn/refmt v0.89.0 h1:ADJTApkvkeBZsN0tBTx8QjpD9JkmxbKp0cxfr9qszm4= github.com/polydawn/refmt v0.89.0/go.mod h1:/zvteZs/GwLtCgZ4BL6CBsk9IKIlexP43ObX9AxTqTw= github.com/prometheus/client_golang v0.8.0/go.mod h1:7SWBe2y4D6OKWSNQJUaRYU/AaXPKyh/dDVn+NZz0KFw= -github.com/prometheus/client_golang v1.20.5 h1:cxppBPuYhUnsO6yo/aoRol4L7q7UFfdm+bR9r+8l63Y= -github.com/prometheus/client_golang v1.20.5/go.mod h1:PIEt8X02hGcP8JWbeHyeZ53Y/jReSnHgO035n//V5WE= +github.com/prometheus/client_golang v1.21.0 h1:DIsaGmiaBkSangBgMtWdNfxbMNdku5IK6iNhrEqWvdA= +github.com/prometheus/client_golang v1.21.0/go.mod h1:U9NM32ykUErtVBxdvD3zfi+EuFkkaBvMb09mIfe0Zgg= github.com/prometheus/client_model v0.0.0-20180712105110-5c3871d89910/go.mod h1:MbSGuTsp3dbXC40dX6PRTWyKYBIrTGTE9sqQNg2J8bo= github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA= github.com/prometheus/client_model v0.6.1 h1:ZKSh/rekM+n3CeS952MLRAdFwIKqeY8b62p8ais2e9E= diff --git a/go.mod b/go.mod index 4109acba4..75a4f43ed 100644 --- a/go.mod +++ b/go.mod @@ -28,7 +28,7 @@ require ( github.com/ipfs/go-ipld-format v0.6.0 github.com/ipfs/go-ipld-legacy v0.2.1 github.com/ipfs/go-log/v2 v2.5.1 - github.com/ipfs/go-metrics-interface v0.0.1 + github.com/ipfs/go-metrics-interface v0.3.0 github.com/ipfs/go-peertaskqueue v0.8.2 github.com/ipfs/go-test v0.0.4 github.com/ipfs/go-unixfsnode v1.9.2 @@ -54,7 +54,7 @@ require ( github.com/multiformats/go-multihash v0.2.3 github.com/multiformats/go-multistream v0.6.0 github.com/polydawn/refmt v0.89.0 - github.com/prometheus/client_golang v1.20.5 + github.com/prometheus/client_golang v1.21.0 github.com/samber/lo v1.47.0 github.com/slok/go-http-metrics v0.12.0 github.com/spaolacci/murmur3 v1.1.0 diff --git a/go.sum b/go.sum index 868380932..37e9879d8 100644 --- a/go.sum +++ b/go.sum @@ -216,8 +216,8 @@ github.com/ipfs/go-log/v2 v2.5.1 h1:1XdUzF7048prq4aBjDQQ4SL5RxftpRGdXhNRwKSAlcY= github.com/ipfs/go-log/v2 v2.5.1/go.mod h1:prSpmC1Gpllc9UYWxDiZDreBYw7zp4Iqp1kOLU9U5UI= github.com/ipfs/go-merkledag v0.11.0 h1:DgzwK5hprESOzS4O1t/wi6JDpyVQdvm9Bs59N/jqfBY= github.com/ipfs/go-merkledag v0.11.0/go.mod h1:Q4f/1ezvBiJV0YCIXvt51W/9/kqJGH4I1LsA7+djsM4= -github.com/ipfs/go-metrics-interface v0.0.1 h1:j+cpbjYvu4R8zbleSs36gvB7jR+wsL2fGD6n0jO4kdg= -github.com/ipfs/go-metrics-interface v0.0.1/go.mod h1:6s6euYU4zowdslK0GKHmqaIZ3j/b/tL7HTWtJ4VPgWY= +github.com/ipfs/go-metrics-interface v0.3.0 h1:YwG7/Cy4R94mYDUuwsBfeziJCVm9pBMJ6q/JR9V40TU= +github.com/ipfs/go-metrics-interface v0.3.0/go.mod h1:OxxQjZDGocXVdyTPocns6cOLwHieqej/jos7H4POwoY= github.com/ipfs/go-peertaskqueue v0.8.2 h1:PaHFRaVFdxQk1Qo3OKiHPYjmmusQy7gKQUaL8JDszAU= github.com/ipfs/go-peertaskqueue v0.8.2/go.mod h1:L6QPvou0346c2qPJNiJa6BvOibxDfaiPlqHInmzg0FA= github.com/ipfs/go-test v0.0.4 h1:DKT66T6GBB6PsDFLoO56QZPrOmzJkqU1FZH5C9ySkew= @@ -416,8 +416,8 @@ github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZN github.com/polydawn/refmt v0.89.0 h1:ADJTApkvkeBZsN0tBTx8QjpD9JkmxbKp0cxfr9qszm4= github.com/polydawn/refmt v0.89.0/go.mod h1:/zvteZs/GwLtCgZ4BL6CBsk9IKIlexP43ObX9AxTqTw= github.com/prometheus/client_golang v0.8.0/go.mod h1:7SWBe2y4D6OKWSNQJUaRYU/AaXPKyh/dDVn+NZz0KFw= -github.com/prometheus/client_golang v1.20.5 h1:cxppBPuYhUnsO6yo/aoRol4L7q7UFfdm+bR9r+8l63Y= -github.com/prometheus/client_golang v1.20.5/go.mod h1:PIEt8X02hGcP8JWbeHyeZ53Y/jReSnHgO035n//V5WE= +github.com/prometheus/client_golang v1.21.0 h1:DIsaGmiaBkSangBgMtWdNfxbMNdku5IK6iNhrEqWvdA= +github.com/prometheus/client_golang v1.21.0/go.mod h1:U9NM32ykUErtVBxdvD3zfi+EuFkkaBvMb09mIfe0Zgg= github.com/prometheus/client_model v0.0.0-20180712105110-5c3871d89910/go.mod h1:MbSGuTsp3dbXC40dX6PRTWyKYBIrTGTE9sqQNg2J8bo= github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA= github.com/prometheus/client_model v0.6.1 h1:ZKSh/rekM+n3CeS952MLRAdFwIKqeY8b62p8ais2e9E= diff --git a/routing/providerquerymanager/providerquerymanager.go b/routing/providerquerymanager/providerquerymanager.go index bdd51fbdd..e70faea19 100644 --- a/routing/providerquerymanager/providerquerymanager.go +++ b/routing/providerquerymanager/providerquerymanager.go @@ -97,6 +97,7 @@ type ProviderQueryManager struct { maxProviders int maxInProcessRequests int + ignorePeers map[peer.ID]struct{} // do not touch outside the run loop inProgressRequestStatuses map[cid.Cid]*inProgressRequestStatus @@ -133,6 +134,17 @@ func WithMaxProviders(count int) Option { } } +// WithIgnoreProviders will ignore provider records from the given peers. +func WithIgnoreProviders(peers ...peer.ID) Option { + return func(mgr *ProviderQueryManager) error { + mgr.ignorePeers = make(map[peer.ID]struct{}) + for _, p := range peers { + mgr.ignorePeers[p] = struct{}{} + } + return nil + } +} + // New initializes a new ProviderQueryManager for a given context and a given // network provider. func New(dialer ProviderQueryDialer, router routing.ContentDiscovery, opts ...Option) (*ProviderQueryManager, error) { @@ -356,6 +368,12 @@ func (pqm *ProviderQueryManager) findProviderWorker() { wg.Add(1) go func(p peer.AddrInfo) { defer wg.Done() + + // Ignore providers when configured. + if _, ok := pqm.ignorePeers[p.ID]; ok { + return + } + span.AddEvent("FoundProvider", trace.WithAttributes(attribute.Stringer("peer", p.ID))) err := pqm.dialer.Connect(findProviderCtx, p) if err != nil && err != swarm.ErrDialToSelf { diff --git a/routing/providerquerymanager/providerquerymanager_test.go b/routing/providerquerymanager/providerquerymanager_test.go index 8026c5364..43c21f659 100644 --- a/routing/providerquerymanager/providerquerymanager_test.go +++ b/routing/providerquerymanager/providerquerymanager_test.go @@ -436,3 +436,26 @@ func TestLimitedProviders(t *testing.T) { t.Fatal("returned more providers than requested") } } + +func TestIgnorePeers(t *testing.T) { + peers := random.Peers(5) + fpd := &fakeProviderDialer{} + fpn := &fakeProviderDiscovery{ + peersFound: peers, + } + + providerQueryManager := mustNotErr(New(fpd, fpn, + WithIgnoreProviders(peers[0:4]...), + )) + defer providerQueryManager.Close() + keys := random.Cids(1) + + providersChan := providerQueryManager.FindProvidersAsync(context.Background(), keys[0], 0) + total := 0 + for range providersChan { + total++ + } + if total != 1 { + t.Fatal("did not ignore 4 of the peers") + } +}