diff --git a/go.mod b/go.mod index aa40d01..8aec920 100644 --- a/go.mod +++ b/go.mod @@ -9,7 +9,7 @@ require ( github.com/felixge/httpsnoop v1.0.4 github.com/ipfs-shipyard/nopfs v0.0.12 github.com/ipfs-shipyard/nopfs/ipfs v0.13.2-0.20231024163508-120e0c51ee3a - github.com/ipfs/boxo v0.21.0 + github.com/ipfs/boxo v0.21.1-0.20240726111146-e95eeb2ae5f1 github.com/ipfs/go-block-format v0.2.0 github.com/ipfs/go-cid v0.4.1 github.com/ipfs/go-datastore v0.6.0 diff --git a/go.sum b/go.sum index 05577df..046c37c 100644 --- a/go.sum +++ b/go.sum @@ -237,8 +237,8 @@ github.com/ipfs-shipyard/nopfs/ipfs v0.13.2-0.20231024163508-120e0c51ee3a h1:MKG github.com/ipfs-shipyard/nopfs/ipfs v0.13.2-0.20231024163508-120e0c51ee3a/go.mod h1:6EekK/jo+TynwSE/ZOiOJd4eEvRXoavEC3vquKtv4yI= github.com/ipfs/bbloom v0.0.4 h1:Gi+8EGJ2y5qiD5FbsbpX/TMNcJw8gSqr7eyjHa4Fhvs= github.com/ipfs/bbloom v0.0.4/go.mod h1:cS9YprKXpoZ9lT0n/Mw/a6/aFV6DTjTLYHeA+gyqMG0= -github.com/ipfs/boxo v0.21.0 h1:XpGXb+TQQ0IUdYaeAxGzWjSs6ow/Lce148A/2IbRDVE= -github.com/ipfs/boxo v0.21.0/go.mod h1:NmweAYeY1USOaJJxouy7DLr/Y5M8UBSsCI2KRivO+TY= +github.com/ipfs/boxo v0.21.1-0.20240726111146-e95eeb2ae5f1 h1:wsetxKWIhOhGi8exgTrZfhxiky76YSwdTcm1ZdcIqAU= +github.com/ipfs/boxo v0.21.1-0.20240726111146-e95eeb2ae5f1/go.mod h1:NmweAYeY1USOaJJxouy7DLr/Y5M8UBSsCI2KRivO+TY= github.com/ipfs/go-bitfield v1.1.0 h1:fh7FIo8bSwaJEh6DdTWbCeZ1eqOaOkKFI74SCnsWbGA= github.com/ipfs/go-bitfield v1.1.0/go.mod h1:paqf1wjq/D2BBmzfTVFlJQ9IlFOZpg422HL0HqsGWHU= github.com/ipfs/go-bitswap v0.11.0 h1:j1WVvhDX1yhG32NTC9xfxnqycqYIlhzEzLXG/cU1HyQ= diff --git a/setup_bitswap.go b/setup_bitswap.go index 374406d..7cdc9f4 100644 --- a/setup_bitswap.go +++ b/setup_bitswap.go @@ -2,6 +2,8 @@ package main import ( "context" + "github.com/ipfs/boxo/routing/providerquerymanager" + "github.com/libp2p/go-libp2p/core/peerstore" "time" "github.com/ipfs/boxo/bitswap" @@ -23,6 +25,12 @@ import ( func setupBitswapExchange(ctx context.Context, cfg Config, h host.Host, cr routing.ContentRouting, bstore blockstore.Blockstore) exchange.Interface { bsctx := metri.CtxScope(ctx, "ipfs_bitswap") + n := &providerQueryNetwork{cr, h} + pqm, err := providerquerymanager.New(ctx, n, providerquerymanager.WithMaxInProcessRequests(100)) + if err != nil { + panic(err) + } + cr = &wrapProv{pqm: pqm} bn := bsnet.NewFromIpfsHost(h, cr) // --- Client Options @@ -33,6 +41,14 @@ func setupBitswapExchange(ctx context.Context, cfg Config, h host.Host, cr routi // bitswap.ProviderSearchDelay: default is 1 second. providerSearchDelay := 1 * time.Second + // --- Bitswap Client Options + clientOpts := []bsclient.Option{ + bsclient.RebroadcastDelay(rebroadcastDelay), + bsclient.ProviderSearchDelay(providerSearchDelay), + bsclient.WithoutDuplicatedBlockStats(), + bsclient.WithDefaultLookupManagement(false), + } + // If peering and shared cache are both enabled, we initialize both a // Client and a Server with custom request filter and custom options. // client+server is more expensive but necessary when deployment requires @@ -50,37 +66,88 @@ func setupBitswapExchange(ctx context.Context, cfg Config, h host.Host, cr routi return ok } - // Initialize client+server - bswap := bitswap.New(bsctx, bn, bstore, - // --- Client Options - bitswap.RebroadcastDelay(rebroadcastDelay), - bitswap.ProviderSearchDelay(providerSearchDelay), - bitswap.WithoutDuplicatedBlockStats(), + // turn bitswap clients option into bitswap options + var opts []bitswap.Option + for _, o := range clientOpts { + opts = append(opts, bitswap.WithClientOption(o)) + } - // ---- Server Options + // ---- Server Options + opts = append(opts, bitswap.WithPeerBlockRequestFilter(peerBlockRequestFilter), bitswap.ProvideEnabled(false), // Do not keep track of other peer's wantlists, we only want to reply if we // have a block. If we get it later, it's no longer relevant. bitswap.WithPeerLedger(&noopPeerLedger{}), // When we don't have a block, don't reply. This reduces processment. - bitswap.SetSendDontHaves(false), - ) + bitswap.SetSendDontHaves(false)) + + // Initialize client+server + bswap := bitswap.New(bsctx, bn, bstore, opts...) bn.Start(bswap) return &noNotifyExchange{bswap} } // By default, rainbow runs with bitswap client alone - bswap := bsclient.New(bsctx, bn, bstore, - // --- Client Options - bsclient.RebroadcastDelay(rebroadcastDelay), - bsclient.ProviderSearchDelay(providerSearchDelay), - bsclient.WithoutDuplicatedBlockStats(), - ) + bswap := bsclient.New(bsctx, bn, bstore, clientOpts...) bn.Start(bswap) return bswap } +type providerQueryNetwork struct { + routing.ContentRouting + host.Host +} + +func (p *providerQueryNetwork) ConnectTo(ctx context.Context, id peer.ID) error { + return p.Host.Connect(ctx, peer.AddrInfo{ID: id}) +} + +func (p *providerQueryNetwork) FindProvidersAsync(ctx context.Context, c cid.Cid, i int) <-chan peer.ID { + out := make(chan peer.ID, i) + go func() { + defer close(out) + providers := p.ContentRouting.FindProvidersAsync(ctx, c, i) + for info := range providers { + if info.ID == p.Host.ID() { + continue // ignore self as provider + } + p.Host.Peerstore().AddAddrs(info.ID, info.Addrs, peerstore.TempAddrTTL) + select { + case <-ctx.Done(): + return + case out <- info.ID: + } + } + }() + return out +} + +type wrapProv struct { + pqm *providerquerymanager.ProviderQueryManager +} + +var _ routing.ContentRouting = (*wrapProv)(nil) + +func (r *wrapProv) Provide(ctx context.Context, c cid.Cid, b bool) error { + return routing.ErrNotSupported +} + +func (r *wrapProv) FindProvidersAsync(ctx context.Context, c cid.Cid, _ int) <-chan peer.AddrInfo { + retCh := make(chan peer.AddrInfo) + go func() { + defer close(retCh) + provsCh := r.pqm.FindProvidersAsync(ctx, c) + for p := range provsCh { + select { + case retCh <- peer.AddrInfo{ID: p}: + case <-ctx.Done(): + } + } + }() + return retCh +} + type noopPeerLedger struct{} func (*noopPeerLedger) Wants(p peer.ID, e wl.Entry) {}