diff --git a/CHANGELOG.md b/CHANGELOG.md index 6d32d0b6d..84e8558b5 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -14,6 +14,52 @@ The following emojis are used to highlight certain changes: ## [Unreleased] +- `bitswap`, `routing`, `exchange` ([#641](https://github.com/ipfs/boxo/pull/641)): + - ✨ Bitswap is no longer in charge of providing blocks to the newtork: providing functionality is now handled by a `exchange/providing.Exchange`, meant to be used with `provider.System` so that all provides follow the same rules (multiple parts of the code where handling provides) before. + - 🛠 `bitswap/client/internal/providerquerymanager` has been moved to `routing/providerquerymanager` where it belongs. In order to keep compatibility, Bitswap now receives a `routing.ContentDiscovery` parameter which implements `FindProvidersAsync(...)` and uses it to create a `providerquerymanager` with the default settings as before. Custom settings can be used by using a custom `providerquerymanager` to manually wrap a `ContentDiscovery` object and pass that in as `ContentDiscovery` on initialization while setting `bitswap.WithDefaultProviderQueryManager(false)` (to avoid re-wrapping it again). + - The renovated `providedQueryManager` will trigger lookups until it manages to connect to `MaxProviders`. Before it would lookup at most `MaxInProcessRequests*MaxProviders` and connection failures may have limited the actual number of providers found. + - 🛠 We have aligned our routing-related interfaces with the libp2p [`routing`](https://pkg.go.dev/github.com/libp2p/go-libp2p/core/routing#ContentRouting) ones, including in the `reprovider.System`. + - In order to obtain exactly the same behaviour as before (i.e. particularly ensuring that new blocks are still provided), what was done like: + +```go + bswapnet := network.NewFromIpfsHost(host, contentRouter) + bswap := bitswap.New(p.ctx, bswapnet, blockstore) + bserv = blockservice.New(blockstore, bswap) +``` + - becomes: + +```go + // Create network: no contentRouter anymore + bswapnet := network.NewFromIpfsHost(host) + // Create Bitswap: a new "discovery" parameter, usually the "contentRouter" + // which does both discovery and providing. + bswap := bitswap.New(p.ctx, bswapnet, discovery, blockstore) + // A provider system that handles concurrent provides etc. "contentProvider" + // is usually the "contentRouter" which does both discovery and providing. + // "contentProvider" could be used directly without wrapping, but it is recommended + // to do so to provide more efficiently. + provider := provider.New(datastore, provider.Online(contentProvider) + // A wrapped providing exchange using the previous exchange and the provider. + exch := providing.New(bswap, provider) + + // Finally the blockservice + bserv := blockservice.New(blockstore, exch) + ... +``` + + - The above is only necessary if content routing is needed. Otherwise: + +``` + // Create network: no contentRouter anymore + bswapnet := network.NewFromIpfsHost(host) + // Create Bitswap: a new "discovery" parameter set to nil (disable content discovery) + bswap := bitswap.New(p.ctx, bswapnet, nil, blockstore) + // Finally the blockservice + bserv := blockservice.New(blockstore, exch) +``` + + + ### Added - `routing/http/server`: added built-in Prometheus instrumentation to http delegated `/routing/v1/` endpoints, with custom buckets for response size and duration to match real world data observed at [the `delegated-ipfs.dev` instance](https://docs.ipfs.tech/concepts/public-utilities/#delegated-routing). [#718](https://github.com/ipfs/boxo/pull/718) [#724](https://github.com/ipfs/boxo/pull/724) @@ -117,6 +163,7 @@ The following emojis are used to highlight certain changes: - `bitswap/client` fix memory leak in BlockPresenceManager due to unlimited map growth. [#636](https://github.com/ipfs/boxo/pull/636) - `bitswap/network` fixed race condition when a timeout occurred before hole punching completed while establishing a first-time stream to a peer behind a NAT [#651](https://github.com/ipfs/boxo/pull/651) - `bitswap`: wantlist overflow handling now cancels existing entries to make room for newer entries. This fix prevents the wantlist from filling up with CIDs that the server does not have. [#629](https://github.com/ipfs/boxo/pull/629) +- 🛠 `bitswap` & `bitswap/server` no longer provide to content routers, instead you can use the `provider` package because it uses a datastore queue and batches calls to ProvideMany. ## [v0.21.0] diff --git a/bitswap/benchmarks_test.go b/bitswap/benchmarks_test.go index bd8f342ea..d1930c900 100644 --- a/bitswap/benchmarks_test.go +++ b/bitswap/benchmarks_test.go @@ -12,17 +12,16 @@ import ( "testing" "time" - blocks "github.com/ipfs/go-block-format" - "github.com/ipfs/go-test/random" - protocol "github.com/libp2p/go-libp2p/core/protocol" - "github.com/ipfs/boxo/bitswap" bsnet "github.com/ipfs/boxo/bitswap/network" testinstance "github.com/ipfs/boxo/bitswap/testinstance" tn "github.com/ipfs/boxo/bitswap/testnet" mockrouting "github.com/ipfs/boxo/routing/mock" + blocks "github.com/ipfs/go-block-format" cid "github.com/ipfs/go-cid" delay "github.com/ipfs/go-ipfs-delay" + "github.com/ipfs/go-test/random" + protocol "github.com/libp2p/go-libp2p/core/protocol" ) type fetchFunc func(b *testing.B, bs *bitswap.Bitswap, ks []cid.Cid) @@ -135,6 +134,7 @@ func BenchmarkFetchFromOldBitswap(b *testing.B) { benchmarkLog = nil fixedDelay := delay.Fixed(10 * time.Millisecond) bstoreLatency := time.Duration(0) + router := mockrouting.NewServer() for _, bch := range mixedBenches { b.Run(bch.name, func(b *testing.B) { @@ -142,17 +142,17 @@ func BenchmarkFetchFromOldBitswap(b *testing.B) { oldSeedCount := bch.oldSeedCount newSeedCount := bch.nodeCount - (fetcherCount + oldSeedCount) - net := tn.VirtualNetwork(mockrouting.NewServer(), fixedDelay) + net := tn.VirtualNetwork(fixedDelay) // Simulate an older Bitswap node (old protocol ID) that doesn't // send DONT_HAVE responses oldProtocol := []protocol.ID{bsnet.ProtocolBitswapOneOne} oldNetOpts := []bsnet.NetOpt{bsnet.SupportedProtocols(oldProtocol)} oldBsOpts := []bitswap.Option{bitswap.SetSendDontHaves(false)} - oldNodeGenerator := testinstance.NewTestInstanceGenerator(net, oldNetOpts, oldBsOpts) + oldNodeGenerator := testinstance.NewTestInstanceGenerator(net, router, oldNetOpts, oldBsOpts) // Regular new Bitswap node - newNodeGenerator := testinstance.NewTestInstanceGenerator(net, nil, nil) + newNodeGenerator := testinstance.NewTestInstanceGenerator(net, router, nil, nil) var instances []testinstance.Instance // Create new nodes (fetchers + seeds) @@ -294,9 +294,10 @@ func BenchmarkDatacenterMultiLeechMultiSeed(b *testing.B) { numblks := 1000 for i := 0; i < b.N; i++ { - net := tn.RateLimitedVirtualNetwork(mockrouting.NewServer(), d, rateLimitGenerator) + net := tn.RateLimitedVirtualNetwork(d, rateLimitGenerator) - ig := testinstance.NewTestInstanceGenerator(net, nil, nil) + router := mockrouting.NewServer() + ig := testinstance.NewTestInstanceGenerator(net, router, nil, nil) defer ig.Close() instances := ig.Instances(numnodes) @@ -312,9 +313,9 @@ func BenchmarkDatacenterMultiLeechMultiSeed(b *testing.B) { func subtestDistributeAndFetch(b *testing.B, numnodes, numblks int, d delay.D, bstoreLatency time.Duration, df distFunc, ff fetchFunc) { for i := 0; i < b.N; i++ { - net := tn.VirtualNetwork(mockrouting.NewServer(), d) - - ig := testinstance.NewTestInstanceGenerator(net, nil, nil) + net := tn.VirtualNetwork(d) + router := mockrouting.NewServer() + ig := testinstance.NewTestInstanceGenerator(net, router, nil, nil) instances := ig.Instances(numnodes) rootBlock := random.BlocksOfSize(1, rootBlockSize) @@ -327,9 +328,9 @@ func subtestDistributeAndFetch(b *testing.B, numnodes, numblks int, d delay.D, b func subtestDistributeAndFetchRateLimited(b *testing.B, numnodes, numblks int, d delay.D, rateLimitGenerator tn.RateLimitGenerator, blockSize int64, bstoreLatency time.Duration, df distFunc, ff fetchFunc) { for i := 0; i < b.N; i++ { - net := tn.RateLimitedVirtualNetwork(mockrouting.NewServer(), d, rateLimitGenerator) - - ig := testinstance.NewTestInstanceGenerator(net, nil, nil) + net := tn.RateLimitedVirtualNetwork(d, rateLimitGenerator) + router := mockrouting.NewServer() + ig := testinstance.NewTestInstanceGenerator(net, router, nil, nil) defer ig.Close() instances := ig.Instances(numnodes) @@ -437,7 +438,7 @@ func runDistribution(b *testing.B, instances []testinstance.Instance, blocks []b func allToAll(b *testing.B, provs []testinstance.Instance, blocks []blocks.Block) { for _, p := range provs { - if err := p.Blockstore().PutMany(context.Background(), blocks); err != nil { + if err := p.Blockstore.PutMany(context.Background(), blocks); err != nil { b.Fatal(err) } } @@ -452,10 +453,10 @@ func overlap1(b *testing.B, provs []testinstance.Instance, blks []blocks.Block) bill := provs[0] jeff := provs[1] - if err := bill.Blockstore().PutMany(context.Background(), blks[:75]); err != nil { + if err := bill.Blockstore.PutMany(context.Background(), blks[:75]); err != nil { b.Fatal(err) } - if err := jeff.Blockstore().PutMany(context.Background(), blks[25:]); err != nil { + if err := jeff.Blockstore.PutMany(context.Background(), blks[25:]); err != nil { b.Fatal(err) } } @@ -473,12 +474,12 @@ func overlap2(b *testing.B, provs []testinstance.Instance, blks []blocks.Block) even := i%2 == 0 third := i%3 == 0 if third || even { - if err := bill.Blockstore().Put(context.Background(), blk); err != nil { + if err := bill.Blockstore.Put(context.Background(), blk); err != nil { b.Fatal(err) } } if third || !even { - if err := jeff.Blockstore().Put(context.Background(), blk); err != nil { + if err := jeff.Blockstore.Put(context.Background(), blk); err != nil { b.Fatal(err) } } @@ -490,7 +491,7 @@ func overlap2(b *testing.B, provs []testinstance.Instance, blks []blocks.Block) // but we're mostly just testing performance of the sync algorithm func onePeerPerBlock(b *testing.B, provs []testinstance.Instance, blks []blocks.Block) { for _, blk := range blks { - err := provs[rand.Intn(len(provs))].Blockstore().Put(context.Background(), blk) + err := provs[rand.Intn(len(provs))].Blockstore.Put(context.Background(), blk) if err != nil { b.Fatal(err) } diff --git a/bitswap/bitswap.go b/bitswap/bitswap.go index ddc50f6dd..558eb12e0 100644 --- a/bitswap/bitswap.go +++ b/bitswap/bitswap.go @@ -5,7 +5,6 @@ import ( "fmt" "github.com/ipfs/boxo/bitswap/client" - "github.com/ipfs/boxo/bitswap/internal/defaults" "github.com/ipfs/boxo/bitswap/message" "github.com/ipfs/boxo/bitswap/network" "github.com/ipfs/boxo/bitswap/server" @@ -45,9 +44,8 @@ type bitswap interface { } var ( - _ exchange.SessionExchange = (*Bitswap)(nil) - _ bitswap = (*Bitswap)(nil) - HasBlockBufferSize = defaults.HasBlockBufferSize + _ exchange.SessionExchange = (*Bitswap)(nil) + _ bitswap = (*Bitswap)(nil) ) type Bitswap struct { @@ -58,7 +56,7 @@ type Bitswap struct { net network.BitSwapNetwork } -func New(ctx context.Context, net network.BitSwapNetwork, bstore blockstore.Blockstore, options ...Option) *Bitswap { +func New(ctx context.Context, net network.BitSwapNetwork, providerFinder client.ProviderFinder, bstore blockstore.Blockstore, options ...Option) *Bitswap { bs := &Bitswap{ net: net, } @@ -85,14 +83,10 @@ func New(ctx context.Context, net network.BitSwapNetwork, bstore blockstore.Bloc serverOptions = append(serverOptions, server.WithTracer(tracer)) } - if HasBlockBufferSize != defaults.HasBlockBufferSize { - serverOptions = append(serverOptions, server.HasBlockBufferSize(HasBlockBufferSize)) - } - ctx = metrics.CtxSubScope(ctx, "bitswap") bs.Server = server.New(ctx, net, bstore, serverOptions...) - bs.Client = client.New(ctx, net, bstore, append(clientOptions, client.WithBlockReceivedNotifier(bs.Server))...) + bs.Client = client.New(ctx, net, providerFinder, bstore, append(clientOptions, client.WithBlockReceivedNotifier(bs.Server))...) net.Start(bs) // use the polyfill receiver to log received errors and trace messages only once return bs @@ -115,7 +109,6 @@ type Stat struct { MessagesReceived uint64 BlocksSent uint64 DataSent uint64 - ProvideBufLen int } func (bs *Bitswap) Stat() (*Stat, error) { @@ -138,7 +131,6 @@ func (bs *Bitswap) Stat() (*Stat, error) { Peers: ss.Peers, BlocksSent: ss.BlocksSent, DataSent: ss.DataSent, - ProvideBufLen: ss.ProvideBufLen, }, nil } diff --git a/bitswap/bitswap_test.go b/bitswap/bitswap_test.go index 85055879c..2fb32aa61 100644 --- a/bitswap/bitswap_test.go +++ b/bitswap/bitswap_test.go @@ -36,7 +36,7 @@ func isCI() bool { func addBlock(t *testing.T, ctx context.Context, inst testinstance.Instance, blk blocks.Block) { t.Helper() - err := inst.Blockstore().Put(ctx, blk) + err := inst.Blockstore.Put(ctx, blk) if err != nil { t.Fatal(err) } @@ -51,8 +51,9 @@ func addBlock(t *testing.T, ctx context.Context, inst testinstance.Instance, blk const kNetworkDelay = 0 * time.Millisecond func TestClose(t *testing.T) { - vnet := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(kNetworkDelay)) - ig := testinstance.NewTestInstanceGenerator(vnet, nil, nil) + vnet := tn.VirtualNetwork(delay.Fixed(kNetworkDelay)) + router := mockrouting.NewServer() + ig := testinstance.NewTestInstanceGenerator(vnet, router, nil, nil) defer ig.Close() block := random.BlocksOfSize(1, blockSize)[0] bitswap := ig.Next() @@ -65,14 +66,14 @@ func TestClose(t *testing.T) { } func TestProviderForKeyButNetworkCannotFind(t *testing.T) { // TODO revisit this - rs := mockrouting.NewServer() - net := tn.VirtualNetwork(rs, delay.Fixed(kNetworkDelay)) - ig := testinstance.NewTestInstanceGenerator(net, nil, nil) + net := tn.VirtualNetwork(delay.Fixed(kNetworkDelay)) + router := mockrouting.NewServer() + ig := testinstance.NewTestInstanceGenerator(net, router, nil, nil) defer ig.Close() block := blocks.NewBlock([]byte("block")) pinfo := p2ptestutil.RandTestBogusIdentityOrFatal(t) - err := rs.Client(pinfo).Provide(context.Background(), block.Cid(), true) // but not on network + err := router.Client(pinfo).Provide(context.Background(), block.Cid(), true) // but not on network if err != nil { t.Fatal(err) } @@ -90,9 +91,10 @@ func TestProviderForKeyButNetworkCannotFind(t *testing.T) { // TODO revisit this } func TestGetBlockFromPeerAfterPeerAnnounces(t *testing.T) { - net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(kNetworkDelay)) + net := tn.VirtualNetwork(delay.Fixed(kNetworkDelay)) block := blocks.NewBlock([]byte("block")) - ig := testinstance.NewTestInstanceGenerator(net, nil, nil) + router := mockrouting.NewServer() + ig := testinstance.NewTestInstanceGenerator(net, router, nil, nil) defer ig.Close() peers := ig.Instances(2) @@ -118,10 +120,11 @@ func TestGetBlockFromPeerAfterPeerAnnounces(t *testing.T) { } func TestDoesNotProvideWhenConfiguredNotTo(t *testing.T) { - net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(kNetworkDelay)) + net := tn.VirtualNetwork(delay.Fixed(kNetworkDelay)) block := blocks.NewBlock([]byte("block")) - bsOpts := []bitswap.Option{bitswap.ProvideEnabled(false), bitswap.ProviderSearchDelay(50 * time.Millisecond)} - ig := testinstance.NewTestInstanceGenerator(net, nil, bsOpts) + bsOpts := []bitswap.Option{bitswap.ProviderSearchDelay(50 * time.Millisecond)} + router := mockrouting.NewServer() + ig := testinstance.NewTestInstanceGenerator(net, router, nil, bsOpts) defer ig.Close() hasBlock := ig.Next() @@ -150,12 +153,13 @@ func TestDoesNotProvideWhenConfiguredNotTo(t *testing.T) { // Tests that a received block is not stored in the blockstore if the block was // not requested by the client func TestUnwantedBlockNotAdded(t *testing.T) { - net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(kNetworkDelay)) + net := tn.VirtualNetwork(delay.Fixed(kNetworkDelay)) block := blocks.NewBlock([]byte("block")) bsMessage := bsmsg.New(true) bsMessage.AddBlock(block) - ig := testinstance.NewTestInstanceGenerator(net, nil, nil) + router := mockrouting.NewServer() + ig := testinstance.NewTestInstanceGenerator(net, router, nil, nil) defer ig.Close() peers := ig.Instances(2) @@ -170,9 +174,9 @@ func TestUnwantedBlockNotAdded(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), time.Second) defer cancel() - doesNotWantBlock.Exchange.ReceiveMessage(ctx, hasBlock.Peer, bsMessage) + doesNotWantBlock.Exchange.ReceiveMessage(ctx, hasBlock.Identity.ID(), bsMessage) - blockInStore, err := doesNotWantBlock.Blockstore().Has(ctx, block.Cid()) + blockInStore, err := doesNotWantBlock.Blockstore.Has(ctx, block.Cid()) if err != nil || blockInStore { t.Fatal("Unwanted block added to block store") } @@ -186,10 +190,11 @@ func TestUnwantedBlockNotAdded(t *testing.T) { // (because the live request queue is full) func TestPendingBlockAdded(t *testing.T) { ctx := context.Background() - net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(kNetworkDelay)) + net := tn.VirtualNetwork(delay.Fixed(kNetworkDelay)) sessionBroadcastWantCapacity := 4 - ig := testinstance.NewTestInstanceGenerator(net, nil, nil) + router := mockrouting.NewServer() + ig := testinstance.NewTestInstanceGenerator(net, router, nil, nil) defer ig.Close() instance := ig.Instances(1)[0] @@ -277,8 +282,9 @@ func PerformDistributionTest(t *testing.T, numInstances, numBlocks int) { if testing.Short() { t.SkipNow() } - net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(kNetworkDelay)) - ig := testinstance.NewTestInstanceGenerator(net, nil, []bitswap.Option{ + net := tn.VirtualNetwork(delay.Fixed(kNetworkDelay)) + router := mockrouting.NewServer() + ig := testinstance.NewTestInstanceGenerator(net, router, nil, []bitswap.Option{ bitswap.TaskWorkerCount(5), bitswap.EngineTaskWorkerCount(5), bitswap.MaxOutstandingBytesPerPeer(1 << 20), @@ -333,16 +339,17 @@ func TestSendToWantingPeer(t *testing.T) { t.SkipNow() } - net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(kNetworkDelay)) - ig := testinstance.NewTestInstanceGenerator(net, nil, nil) + net := tn.VirtualNetwork(delay.Fixed(kNetworkDelay)) + router := mockrouting.NewServer() + ig := testinstance.NewTestInstanceGenerator(net, router, nil, nil) defer ig.Close() peers := ig.Instances(2) peerA := peers[0] peerB := peers[1] - t.Logf("Session %v\n", peerA.Peer) - t.Logf("Session %v\n", peerB.Peer) + t.Logf("Session %v\n", peerA.Identity.ID()) + t.Logf("Session %v\n", peerB.Identity.ID()) waitTime := time.Second * 5 @@ -370,8 +377,9 @@ func TestSendToWantingPeer(t *testing.T) { } func TestEmptyKey(t *testing.T) { - net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(kNetworkDelay)) - ig := testinstance.NewTestInstanceGenerator(net, nil, nil) + net := tn.VirtualNetwork(delay.Fixed(kNetworkDelay)) + router := mockrouting.NewServer() + ig := testinstance.NewTestInstanceGenerator(net, router, nil, nil) defer ig.Close() bs := ig.Instances(1)[0].Exchange @@ -403,8 +411,9 @@ func assertStat(t *testing.T, st *bitswap.Stat, sblks, rblks, sdata, rdata uint6 } func TestBasicBitswap(t *testing.T) { - net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(kNetworkDelay)) - ig := testinstance.NewTestInstanceGenerator(net, nil, nil) + net := tn.VirtualNetwork(delay.Fixed(kNetworkDelay)) + router := mockrouting.NewServer() + ig := testinstance.NewTestInstanceGenerator(net, router, nil, nil) defer ig.Close() t.Log("Test a one node trying to get one block from another") @@ -428,7 +437,7 @@ func TestBasicBitswap(t *testing.T) { // When second peer receives block, it should send out a cancel, so third // peer should no longer keep second peer's want if err = tu.WaitFor(ctx, func() error { - if len(instances[2].Exchange.WantlistForPeer(instances[1].Peer)) != 0 { + if len(instances[2].Exchange.WantlistForPeer(instances[1].Identity.ID())) != 0 { return errors.New("should have no items in other peers wantlist") } if len(instances[1].Exchange.GetWantlist()) != 0 { @@ -474,8 +483,9 @@ func TestBasicBitswap(t *testing.T) { } func TestDoubleGet(t *testing.T) { - net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(kNetworkDelay)) - ig := testinstance.NewTestInstanceGenerator(net, nil, nil) + net := tn.VirtualNetwork(delay.Fixed(kNetworkDelay)) + router := mockrouting.NewServer() + ig := testinstance.NewTestInstanceGenerator(net, router, nil, nil) defer ig.Close() t.Log("Test a one node trying to get one block from another") @@ -518,7 +528,7 @@ func TestDoubleGet(t *testing.T) { } t.Log(blk) case <-time.After(time.Second * 5): - p1wl := instances[0].Exchange.WantlistForPeer(instances[1].Peer) + p1wl := instances[0].Exchange.WantlistForPeer(instances[1].Identity.ID()) if len(p1wl) != 1 { t.Logf("wantlist view didnt have 1 item (had %d)", len(p1wl)) } else if !p1wl[0].Equals(blocks[0].Cid()) { @@ -538,8 +548,9 @@ func TestDoubleGet(t *testing.T) { } func TestWantlistCleanup(t *testing.T) { - net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(kNetworkDelay)) - ig := testinstance.NewTestInstanceGenerator(net, nil, nil) + net := tn.VirtualNetwork(delay.Fixed(kNetworkDelay)) + router := mockrouting.NewServer() + ig := testinstance.NewTestInstanceGenerator(net, router, nil, nil) defer ig.Close() instances := ig.Instances(2) @@ -659,8 +670,9 @@ func newReceipt(sent, recv, exchanged uint64) *server.Receipt { } func TestBitswapLedgerOneWay(t *testing.T) { - net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(kNetworkDelay)) - ig := testinstance.NewTestInstanceGenerator(net, nil, nil) + net := tn.VirtualNetwork(delay.Fixed(kNetworkDelay)) + router := mockrouting.NewServer() + ig := testinstance.NewTestInstanceGenerator(net, router, nil, nil) defer ig.Close() t.Log("Test ledgers match when one peer sends block to another") @@ -676,8 +688,8 @@ func TestBitswapLedgerOneWay(t *testing.T) { t.Fatal(err) } - ra := instances[0].Exchange.LedgerForPeer(instances[1].Peer) - rb := instances[1].Exchange.LedgerForPeer(instances[0].Peer) + ra := instances[0].Exchange.LedgerForPeer(instances[1].Identity.ID()) + rb := instances[1].Exchange.LedgerForPeer(instances[0].Identity.ID()) // compare peer ledger receipts err = assertLedgerMatch(ra, rb) @@ -707,8 +719,9 @@ func TestBitswapLedgerOneWay(t *testing.T) { } func TestBitswapLedgerTwoWay(t *testing.T) { - net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(kNetworkDelay)) - ig := testinstance.NewTestInstanceGenerator(net, nil, nil) + net := tn.VirtualNetwork(delay.Fixed(kNetworkDelay)) + router := mockrouting.NewServer() + ig := testinstance.NewTestInstanceGenerator(net, router, nil, nil) defer ig.Close() t.Log("Test ledgers match when two peers send one block to each other") @@ -732,8 +745,8 @@ func TestBitswapLedgerTwoWay(t *testing.T) { t.Fatal(err) } - ra := instances[0].Exchange.LedgerForPeer(instances[1].Peer) - rb := instances[1].Exchange.LedgerForPeer(instances[0].Peer) + ra := instances[0].Exchange.LedgerForPeer(instances[1].Identity.ID()) + rb := instances[1].Exchange.LedgerForPeer(instances[0].Identity.ID()) // compare peer ledger receipts err = assertLedgerMatch(ra, rb) @@ -795,9 +808,10 @@ func (tsl *testingScoreLedger) Stop() { // Tests start and stop of a custom decision logic func TestWithScoreLedger(t *testing.T) { tsl := newTestingScoreLedger() - net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(kNetworkDelay)) + net := tn.VirtualNetwork(delay.Fixed(kNetworkDelay)) + router := mockrouting.NewServer() bsOpts := []bitswap.Option{bitswap.WithScoreLedger(tsl)} - ig := testinstance.NewTestInstanceGenerator(net, nil, bsOpts) + ig := testinstance.NewTestInstanceGenerator(net, router, nil, bsOpts) defer ig.Close() i := ig.Next() defer i.Exchange.Close() diff --git a/bitswap/client/bitswap_with_sessions_test.go b/bitswap/client/bitswap_with_sessions_test.go index 6241865ef..62695de8b 100644 --- a/bitswap/client/bitswap_with_sessions_test.go +++ b/bitswap/client/bitswap_with_sessions_test.go @@ -26,12 +26,12 @@ const blockSize = 4 func getVirtualNetwork() tn.Network { // FIXME: the tests are really sensitive to the network delay. fix them to work // well under varying conditions - return tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(0)) + return tn.VirtualNetwork(delay.Fixed(0)) } func addBlock(t *testing.T, ctx context.Context, inst testinstance.Instance, blk blocks.Block) { t.Helper() - err := inst.Blockstore().Put(ctx, blk) + err := inst.Blockstore.Put(ctx, blk) if err != nil { t.Fatal(err) } @@ -39,6 +39,10 @@ func addBlock(t *testing.T, ctx context.Context, inst testinstance.Instance, blk if err != nil { t.Fatal(err) } + err = inst.Routing.Provide(ctx, blk.Cid(), true) + if err != nil { + t.Fatal(err) + } } func TestBasicSessions(t *testing.T) { @@ -46,7 +50,8 @@ func TestBasicSessions(t *testing.T) { defer cancel() vnet := getVirtualNetwork() - ig := testinstance.NewTestInstanceGenerator(vnet, nil, nil) + router := mockrouting.NewServer() + ig := testinstance.NewTestInstanceGenerator(vnet, router, nil, nil) defer ig.Close() block := random.BlocksOfSize(1, blockSize)[0] @@ -56,7 +61,7 @@ func TestBasicSessions(t *testing.T) { b := inst[1] // Add a block to Peer B - if err := b.Blockstore().Put(ctx, block); err != nil { + if err := b.Blockstore.Put(ctx, block); err != nil { t.Fatal(err) } @@ -78,7 +83,7 @@ func TestBasicSessions(t *testing.T) { t.Fatal("did not get tracable block") } - if traceBlock.From != b.Peer { + if traceBlock.From != b.Identity.ID() { t.Fatal("should have received block from peer B, did not") } } @@ -111,15 +116,16 @@ func TestSessionBetweenPeers(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - vnet := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(time.Millisecond)) - ig := testinstance.NewTestInstanceGenerator(vnet, nil, []bitswap.Option{bitswap.SetSimulateDontHavesOnTimeout(false)}) + vnet := tn.VirtualNetwork(delay.Fixed(time.Millisecond)) + router := mockrouting.NewServer() + ig := testinstance.NewTestInstanceGenerator(vnet, router, nil, []bitswap.Option{bitswap.SetSimulateDontHavesOnTimeout(false)}) defer ig.Close() inst := ig.Instances(10) // Add 101 blocks to Peer A blks := random.BlocksOfSize(101, blockSize) - if err := inst[0].Blockstore().PutMany(ctx, blks); err != nil { + if err := inst[0].Blockstore.PutMany(ctx, blks); err != nil { t.Fatal(err) } @@ -147,7 +153,7 @@ func TestSessionBetweenPeers(t *testing.T) { for b := range ch { got = append(got, b) } - if err := assertBlockListsFrom(inst[0].Peer, got, blks[i*10:(i+1)*10]); err != nil { + if err := assertBlockListsFrom(inst[0].Identity.ID(), got, blks[i*10:(i+1)*10]); err != nil { t.Fatal(err) } } @@ -171,7 +177,8 @@ func TestSessionSplitFetch(t *testing.T) { defer cancel() vnet := getVirtualNetwork() - ig := testinstance.NewTestInstanceGenerator(vnet, nil, nil) + router := mockrouting.NewServer() + ig := testinstance.NewTestInstanceGenerator(vnet, router, nil, nil) defer ig.Close() inst := ig.Instances(11) @@ -179,7 +186,7 @@ func TestSessionSplitFetch(t *testing.T) { // Add 10 distinct blocks to each of 10 peers blks := random.BlocksOfSize(100, blockSize) for i := 0; i < 10; i++ { - if err := inst[i].Blockstore().PutMany(ctx, blks[i*10:(i+1)*10]); err != nil { + if err := inst[i].Blockstore.PutMany(ctx, blks[i*10:(i+1)*10]); err != nil { t.Fatal(err) } } @@ -203,7 +210,7 @@ func TestSessionSplitFetch(t *testing.T) { for b := range ch { got = append(got, b) } - if err := assertBlockListsFrom(inst[i].Peer, got, blks[i*10:(i+1)*10]); err != nil { + if err := assertBlockListsFrom(inst[i].Identity.ID(), got, blks[i*10:(i+1)*10]); err != nil { t.Fatal(err) } } @@ -214,7 +221,8 @@ func TestFetchNotConnected(t *testing.T) { defer cancel() vnet := getVirtualNetwork() - ig := testinstance.NewTestInstanceGenerator(vnet, nil, []bitswap.Option{bitswap.ProviderSearchDelay(10 * time.Millisecond)}) + router := mockrouting.NewServer() + ig := testinstance.NewTestInstanceGenerator(vnet, router, nil, []bitswap.Option{bitswap.ProviderSearchDelay(10 * time.Millisecond)}) defer ig.Close() other := ig.Next() @@ -236,7 +244,6 @@ func TestFetchNotConnected(t *testing.T) { thisNode := ig.Next() ses := thisNode.Exchange.NewSession(ctx).(*session.Session) ses.SetBaseTickDelay(time.Millisecond * 10) - ch, err := ses.GetBlocks(ctx, cids) if err != nil { t.Fatal(err) @@ -246,7 +253,7 @@ func TestFetchNotConnected(t *testing.T) { for b := range ch { got = append(got, b) } - if err := assertBlockListsFrom(other.Peer, got, blks); err != nil { + if err := assertBlockListsFrom(other.Identity.ID(), got, blks); err != nil { t.Fatal(err) } } @@ -256,7 +263,8 @@ func TestFetchAfterDisconnect(t *testing.T) { defer cancel() vnet := getVirtualNetwork() - ig := testinstance.NewTestInstanceGenerator(vnet, nil, []bitswap.Option{ + router := mockrouting.NewServer() + ig := testinstance.NewTestInstanceGenerator(vnet, router, nil, []bitswap.Option{ bitswap.ProviderSearchDelay(10 * time.Millisecond), bitswap.RebroadcastDelay(delay.Fixed(15 * time.Millisecond)), }) @@ -294,12 +302,12 @@ func TestFetchAfterDisconnect(t *testing.T) { got = append(got, b) } - if err := assertBlockListsFrom(peerA.Peer, got, blks[:5]); err != nil { + if err := assertBlockListsFrom(peerA.Identity.ID(), got, blks[:5]); err != nil { t.Fatal(err) } // Break connection - err = peerA.Adapter.DisconnectFrom(ctx, peerB.Peer) + err = peerA.Adapter.DisconnectFrom(ctx, peerB.Identity.ID()) if err != nil { t.Fatal(err) } @@ -323,7 +331,7 @@ func TestFetchAfterDisconnect(t *testing.T) { } } - if err := assertBlockListsFrom(peerA.Peer, got, blks); err != nil { + if err := assertBlockListsFrom(peerA.Identity.ID(), got, blks); err != nil { t.Fatal(err) } } @@ -333,7 +341,8 @@ func TestInterestCacheOverflow(t *testing.T) { defer cancel() vnet := getVirtualNetwork() - ig := testinstance.NewTestInstanceGenerator(vnet, nil, nil) + router := mockrouting.NewServer() + ig := testinstance.NewTestInstanceGenerator(vnet, router, nil, nil) defer ig.Close() blks := random.BlocksOfSize(2049, blockSize) @@ -382,7 +391,8 @@ func TestPutAfterSessionCacheEvict(t *testing.T) { defer cancel() vnet := getVirtualNetwork() - ig := testinstance.NewTestInstanceGenerator(vnet, nil, nil) + router := mockrouting.NewServer() + ig := testinstance.NewTestInstanceGenerator(vnet, router, nil, nil) defer ig.Close() blks := random.BlocksOfSize(2500, blockSize) @@ -419,7 +429,8 @@ func TestMultipleSessions(t *testing.T) { defer cancel() vnet := getVirtualNetwork() - ig := testinstance.NewTestInstanceGenerator(vnet, nil, nil) + router := mockrouting.NewServer() + ig := testinstance.NewTestInstanceGenerator(vnet, router, nil, nil) defer ig.Close() blk := random.BlocksOfSize(1, blockSize)[0] @@ -459,7 +470,8 @@ func TestWantlistClearsOnCancel(t *testing.T) { defer cancel() vnet := getVirtualNetwork() - ig := testinstance.NewTestInstanceGenerator(vnet, nil, nil) + router := mockrouting.NewServer() + ig := testinstance.NewTestInstanceGenerator(vnet, router, nil, nil) defer ig.Close() blks := random.BlocksOfSize(10, blockSize) diff --git a/bitswap/client/client.go b/bitswap/client/client.go index bab03c3cd..13d8a006a 100644 --- a/bitswap/client/client.go +++ b/bitswap/client/client.go @@ -13,7 +13,6 @@ import ( bsmq "github.com/ipfs/boxo/bitswap/client/internal/messagequeue" "github.com/ipfs/boxo/bitswap/client/internal/notifications" bspm "github.com/ipfs/boxo/bitswap/client/internal/peermanager" - bspqm "github.com/ipfs/boxo/bitswap/client/internal/providerquerymanager" bssession "github.com/ipfs/boxo/bitswap/client/internal/session" bssim "github.com/ipfs/boxo/bitswap/client/internal/sessioninterestmanager" bssm "github.com/ipfs/boxo/bitswap/client/internal/sessionmanager" @@ -26,6 +25,7 @@ import ( "github.com/ipfs/boxo/bitswap/tracer" blockstore "github.com/ipfs/boxo/blockstore" exchange "github.com/ipfs/boxo/exchange" + rpqm "github.com/ipfs/boxo/routing/providerquerymanager" blocks "github.com/ipfs/go-block-format" "github.com/ipfs/go-cid" delay "github.com/ipfs/go-ipfs-delay" @@ -97,6 +97,19 @@ func WithoutDuplicatedBlockStats() Option { } } +// WithDefaultProviderQueryManager indicates wether we should use a the +// default ProviderQueryManager, a wrapper of the content Router which +// provides bounded paralelism and limits for these lookups. The +// ProviderQueryManager setup by default uses maxInProcessRequests = 6 and +// maxProviders = 10. To use a custom ProviderQueryManager, set to false and +// wrap directly the content router provided with the WithContentRouting() +// option. Only takes effect if WithContentRouting is set. +func WithDefaultProviderQueryManager(defaultProviderQueryManager bool) Option { + return func(bs *Client) { + bs.defaultProviderQueryManager = defaultProviderQueryManager + } +} + type BlockReceivedNotifier interface { // ReceivedBlocks notifies the decision engine that a peer is well-behaving // and gave us useful data, potentially increasing its score and making us @@ -104,8 +117,16 @@ type BlockReceivedNotifier interface { ReceivedBlocks(peer.ID, []blocks.Block) } +// ProviderFinder is a subset of +// https://pkg.go.dev/github.com/libp2p/go-libp2p@v0.37.0/core/routing#ContentRouting +type ProviderFinder interface { + FindProvidersAsync(context.Context, cid.Cid, int) <-chan peer.AddrInfo +} + // New initializes a Bitswap client that runs until client.Close is called. -func New(parent context.Context, network bsnet.BitSwapNetwork, bstore blockstore.Blockstore, options ...Option) *Client { +// The Content providerFinder paramteter can be nil to disable content-routing +// lookups for content (rely only on bitswap for discovery). +func New(parent context.Context, network bsnet.BitSwapNetwork, providerFinder ProviderFinder, bstore blockstore.Blockstore, options ...Option) *Client { // important to use provided parent context (since it may include important // loggable data). It's probably not a good idea to allow bitswap to be // coupled to the concerns of the ipfs daemon in this way. @@ -115,11 +136,30 @@ func New(parent context.Context, network bsnet.BitSwapNetwork, bstore blockstore // exclusively. We should probably find another way to share logging data ctx, cancelFunc := context.WithCancel(parent) + bs := &Client{ + network: network, + providerFinder: providerFinder, + blockstore: bstore, + cancel: cancelFunc, + closing: make(chan struct{}), + counters: new(counters), + dupMetric: bmetrics.DupHist(ctx), + allMetric: bmetrics.AllHist(ctx), + provSearchDelay: defaults.ProvSearchDelay, + rebroadcastDelay: delay.Fixed(defaults.RebroadcastDelay), + simulateDontHavesOnTimeout: true, + defaultProviderQueryManager: true, + } + + // apply functional options before starting and running bitswap + for _, option := range options { + option(bs) + } + // onDontHaveTimeout is called when a want-block is sent to a peer that // has an old version of Bitswap that doesn't support DONT_HAVE messages, // or when no response is received within a timeout. var sm *bssm.SessionManager - var bs *Client onDontHaveTimeout := func(p peer.ID, dontHaves []cid.Cid) { // Simulate a message arriving with DONT_HAVEs if bs.simulateDontHavesOnTimeout { @@ -133,7 +173,17 @@ func New(parent context.Context, network bsnet.BitSwapNetwork, bstore blockstore sim := bssim.New() bpm := bsbpm.New() pm := bspm.New(ctx, peerQueueFactory, network.Self()) - pqm := bspqm.New(ctx, network) + + if bs.providerFinder != nil && bs.defaultProviderQueryManager { + // network can do dialing. + pqm, err := rpqm.New(ctx, network, bs.providerFinder, rpqm.WithMaxProviders(10)) + if err != nil { + // Should not be possible to hit this + panic(err) + } + pqm.Startup() + bs.pqm = pqm + } sessionFactory := func( sessctx context.Context, @@ -148,6 +198,14 @@ func New(parent context.Context, network bsnet.BitSwapNetwork, bstore blockstore rebroadcastDelay delay.D, self peer.ID, ) bssm.Session { + // careful when bs.pqm is nil. Since we are type-casting it + // into session.ProviderFinder when passing it, it will become + // not nil. Related: + // https://groups.google.com/g/golang-nuts/c/wnH302gBa4I?pli=1 + var pqm bssession.ProviderFinder + if bs.pqm != nil { + pqm = bs.pqm + } return bssession.New(sessctx, sessmgr, id, spm, pqm, sim, pm, bpm, notif, provSearchDelay, rebroadcastDelay, self) } sessionPeerManagerFactory := func(ctx context.Context, id uint64) bssession.SessionPeerManager { @@ -156,29 +214,10 @@ func New(parent context.Context, network bsnet.BitSwapNetwork, bstore blockstore notif := notifications.New() sm = bssm.New(ctx, sessionFactory, sim, sessionPeerManagerFactory, bpm, pm, notif, network.Self()) - bs = &Client{ - blockstore: bstore, - network: network, - cancel: cancelFunc, - closing: make(chan struct{}), - pm: pm, - sm: sm, - sim: sim, - notif: notif, - counters: new(counters), - dupMetric: bmetrics.DupHist(ctx), - allMetric: bmetrics.AllHist(ctx), - provSearchDelay: defaults.ProvSearchDelay, - rebroadcastDelay: delay.Fixed(defaults.RebroadcastDelay), - simulateDontHavesOnTimeout: true, - } - - // apply functional options before starting and running bitswap - for _, option := range options { - option(bs) - } - - pqm.Startup() + bs.sm = sm + bs.notif = notif + bs.pm = pm + bs.sim = sim return bs } @@ -187,6 +226,12 @@ func New(parent context.Context, network bsnet.BitSwapNetwork, bstore blockstore type Client struct { pm *bspm.PeerManager + providerFinder ProviderFinder + + // the provider query manager manages requests to find providers + pqm *rpqm.ProviderQueryManager + defaultProviderQueryManager bool + // network delivers messages on behalf of the session network bsnet.BitSwapNetwork diff --git a/bitswap/client/internal/messagequeue/messagequeue.go b/bitswap/client/internal/messagequeue/messagequeue.go index edea20b9c..11c25089e 100644 --- a/bitswap/client/internal/messagequeue/messagequeue.go +++ b/bitswap/client/internal/messagequeue/messagequeue.go @@ -51,7 +51,7 @@ const ( // MessageNetwork is any network that can connect peers and generate a message // sender. type MessageNetwork interface { - ConnectTo(context.Context, peer.ID) error + Connect(context.Context, peer.AddrInfo) error NewMessageSender(context.Context, peer.ID, *bsnet.MessageSenderOpts) (bsnet.MessageSender, error) Latency(peer.ID) time.Duration Ping(context.Context, peer.ID) ping.Result diff --git a/bitswap/client/internal/messagequeue/messagequeue_test.go b/bitswap/client/internal/messagequeue/messagequeue_test.go index 3a9c21309..1073a9f74 100644 --- a/bitswap/client/internal/messagequeue/messagequeue_test.go +++ b/bitswap/client/internal/messagequeue/messagequeue_test.go @@ -27,7 +27,7 @@ type fakeMessageNetwork struct { messageSender bsnet.MessageSender } -func (fmn *fakeMessageNetwork) ConnectTo(context.Context, peer.ID) error { +func (fmn *fakeMessageNetwork) Connect(context.Context, peer.AddrInfo) error { return fmn.connectError } diff --git a/bitswap/client/internal/session/session.go b/bitswap/client/internal/session/session.go index 6f99dec0e..3e2a9b53d 100644 --- a/bitswap/client/internal/session/session.go +++ b/bitswap/client/internal/session/session.go @@ -75,7 +75,7 @@ type SessionPeerManager interface { // ProviderFinder is used to find providers for a given key type ProviderFinder interface { // FindProvidersAsync searches for peers that provide the given CID - FindProvidersAsync(ctx context.Context, k cid.Cid) <-chan peer.ID + FindProvidersAsync(ctx context.Context, k cid.Cid, max int) <-chan peer.AddrInfo } // opType is the kind of operation that is being processed by the event loop @@ -403,14 +403,18 @@ func (s *Session) handlePeriodicSearch(ctx context.Context) { // findMorePeers attempts to find more peers for a session by searching for // providers for the given Cid func (s *Session) findMorePeers(ctx context.Context, c cid.Cid) { + // noop when provider finder is disabled + if s.providerFinder == nil { + return + } go func(k cid.Cid) { ctx, span := internal.StartSpan(ctx, "Session.FindMorePeers") defer span.End() - for p := range s.providerFinder.FindProvidersAsync(ctx, k) { + for p := range s.providerFinder.FindProvidersAsync(ctx, k, 0) { // When a provider indicates that it has a cid, it's equivalent to // the providing peer sending a HAVE span.AddEvent("FoundPeer") - s.sws.Update(p, nil, []cid.Cid{c}, nil) + s.sws.Update(p.ID, nil, []cid.Cid{c}, nil) } }(c) } diff --git a/bitswap/client/internal/session/session_test.go b/bitswap/client/internal/session/session_test.go index a14fdffd0..061e298e5 100644 --- a/bitswap/client/internal/session/session_test.go +++ b/bitswap/client/internal/session/session_test.go @@ -116,7 +116,7 @@ func newFakeProviderFinder() *fakeProviderFinder { } } -func (fpf *fakeProviderFinder) FindProvidersAsync(ctx context.Context, k cid.Cid) <-chan peer.ID { +func (fpf *fakeProviderFinder) FindProvidersAsync(ctx context.Context, k cid.Cid, max int) <-chan peer.AddrInfo { go func() { select { case fpf.findMorePeersRequested <- k: @@ -124,7 +124,7 @@ func (fpf *fakeProviderFinder) FindProvidersAsync(ctx context.Context, k cid.Cid } }() - return make(chan peer.ID) + return make(chan peer.AddrInfo) } type wantReq struct { diff --git a/bitswap/internal/defaults/defaults.go b/bitswap/internal/defaults/defaults.go index b30bcc87f..dbcd62a31 100644 --- a/bitswap/internal/defaults/defaults.go +++ b/bitswap/internal/defaults/defaults.go @@ -20,11 +20,6 @@ const ( BitswapMaxOutstandingBytesPerPeer = 1 << 20 // the number of bytes we attempt to make each outgoing bitswap message BitswapEngineTargetMessageSize = 16 * 1024 - // HasBlockBufferSize is the buffer size of the channel for new blocks - // that need to be provided. They should get pulled over by the - // provideCollector even before they are actually provided. - // TODO: Does this need to be this large givent that? - HasBlockBufferSize = 256 // Maximum size of the wantlist we are willing to keep in memory. MaxQueuedWantlistEntiresPerPeer = 1024 diff --git a/bitswap/network/interface.go b/bitswap/network/interface.go index 6ea0fc525..6c56bab14 100644 --- a/bitswap/network/interface.go +++ b/bitswap/network/interface.go @@ -40,7 +40,7 @@ type BitSwapNetwork interface { // Stop stops the network service. Stop() - ConnectTo(context.Context, peer.ID) error + Connect(context.Context, peer.AddrInfo) error DisconnectFrom(context.Context, peer.ID) error NewMessageSender(context.Context, peer.ID, *MessageSenderOpts) (MessageSender, error) @@ -49,8 +49,6 @@ type BitSwapNetwork interface { Stats() Stats - Routing - Pinger } @@ -88,7 +86,7 @@ type Receiver interface { // network. type Routing interface { // FindProvidersAsync returns a channel of providers for the given key. - FindProvidersAsync(context.Context, cid.Cid, int) <-chan peer.ID + FindProvidersAsync(context.Context, cid.Cid, int) <-chan peer.AddrInfo // Provide provides the key to the network. Provide(context.Context, cid.Cid) error diff --git a/bitswap/network/ipfs_impl.go b/bitswap/network/ipfs_impl.go index f01adb996..993b64429 100644 --- a/bitswap/network/ipfs_impl.go +++ b/bitswap/network/ipfs_impl.go @@ -11,15 +11,12 @@ import ( bsmsg "github.com/ipfs/boxo/bitswap/message" "github.com/ipfs/boxo/bitswap/network/internal" - "github.com/ipfs/go-cid" logging "github.com/ipfs/go-log/v2" "github.com/libp2p/go-libp2p/core/connmgr" "github.com/libp2p/go-libp2p/core/host" "github.com/libp2p/go-libp2p/core/network" "github.com/libp2p/go-libp2p/core/peer" - "github.com/libp2p/go-libp2p/core/peerstore" "github.com/libp2p/go-libp2p/core/protocol" - "github.com/libp2p/go-libp2p/core/routing" "github.com/libp2p/go-libp2p/p2p/protocol/ping" "github.com/libp2p/go-msgio" ma "github.com/multiformats/go-multiaddr" @@ -36,12 +33,11 @@ var ( ) // NewFromIpfsHost returns a BitSwapNetwork supported by underlying IPFS host. -func NewFromIpfsHost(host host.Host, r routing.ContentRouting, opts ...NetOpt) BitSwapNetwork { +func NewFromIpfsHost(host host.Host, opts ...NetOpt) BitSwapNetwork { s := processSettings(opts...) bitswapNetwork := impl{ - host: host, - routing: r, + host: host, protocolBitswapNoVers: s.ProtocolPrefix + ProtocolBitswapNoVers, protocolBitswapOneZero: s.ProtocolPrefix + ProtocolBitswapOneZero, @@ -73,7 +69,6 @@ type impl struct { stats Stats host host.Host - routing routing.ContentRouting connectEvtMgr *connectEventManager protocolBitswapNoVers protocol.ID @@ -104,7 +99,7 @@ func (s *streamMessageSender) Connect(ctx context.Context) (network.Stream, erro tctx, cancel := context.WithTimeout(ctx, s.opts.SendTimeout) defer cancel() - if err := s.bsnet.ConnectTo(tctx, s.to); err != nil { + if err := s.bsnet.Connect(ctx, peer.AddrInfo{ID: s.to}); err != nil { return nil, err } @@ -363,40 +358,17 @@ func (bsnet *impl) Stop() { bsnet.host.Network().StopNotify((*netNotifiee)(bsnet)) } -func (bsnet *impl) ConnectTo(ctx context.Context, p peer.ID) error { - return bsnet.host.Connect(ctx, peer.AddrInfo{ID: p}) +func (bsnet *impl) Connect(ctx context.Context, p peer.AddrInfo) error { + if p.ID == bsnet.host.ID() { + return nil + } + return bsnet.host.Connect(ctx, p) } func (bsnet *impl) DisconnectFrom(ctx context.Context, p peer.ID) error { return bsnet.host.Network().ClosePeer(p) } -// FindProvidersAsync returns a channel of providers for the given key. -func (bsnet *impl) FindProvidersAsync(ctx context.Context, k cid.Cid, max int) <-chan peer.ID { - out := make(chan peer.ID, max) - go func() { - defer close(out) - providers := bsnet.routing.FindProvidersAsync(ctx, k, max) - for info := range providers { - if info.ID == bsnet.host.ID() { - continue // ignore self as provider - } - bsnet.host.Peerstore().AddAddrs(info.ID, info.Addrs, peerstore.TempAddrTTL) - select { - case <-ctx.Done(): - return - case out <- info.ID: - } - } - }() - return out -} - -// Provide provides the key to the network -func (bsnet *impl) Provide(ctx context.Context, k cid.Cid) error { - return bsnet.routing.Provide(ctx, k, true) -} - // handleNewStream receives a new stream from the network. func (bsnet *impl) handleNewStream(s network.Stream) { defer s.Close() diff --git a/bitswap/network/ipfs_impl_test.go b/bitswap/network/ipfs_impl_test.go index 91e998846..bfba5709d 100644 --- a/bitswap/network/ipfs_impl_test.go +++ b/bitswap/network/ipfs_impl_test.go @@ -13,8 +13,6 @@ import ( bsnet "github.com/ipfs/boxo/bitswap/network" "github.com/ipfs/boxo/bitswap/network/internal" tn "github.com/ipfs/boxo/bitswap/testnet" - mockrouting "github.com/ipfs/boxo/routing/mock" - ds "github.com/ipfs/go-datastore" "github.com/ipfs/go-test/random" tnet "github.com/libp2p/go-libp2p-testing/net" "github.com/libp2p/go-libp2p/core/host" @@ -170,8 +168,7 @@ func TestMessageSendAndReceive(t *testing.T) { defer cancel() mn := mocknet.New() defer mn.Close() - mr := mockrouting.NewServer() - streamNet, err := tn.StreamNet(ctx, mn, mr) + streamNet, err := tn.StreamNet(ctx, mn) if err != nil { t.Fatal("Unable to setup network") } @@ -191,7 +188,7 @@ func TestMessageSendAndReceive(t *testing.T) { if err != nil { t.Fatal(err) } - err = bsnet1.ConnectTo(ctx, p2.ID()) + err = bsnet1.Connect(ctx, peer.AddrInfo{ID: p2.ID()}) if err != nil { t.Fatal(err) } @@ -200,7 +197,7 @@ func TestMessageSendAndReceive(t *testing.T) { t.Fatal("did not connect peer") case <-r1.connectionEvent: } - err = bsnet2.ConnectTo(ctx, p1.ID()) + err = bsnet2.Connect(ctx, peer.AddrInfo{ID: p1.ID()}) if err != nil { t.Fatal(err) } @@ -275,7 +272,6 @@ func prepareNetwork(t *testing.T, ctx context.Context, p1 tnet.Identity, r1 *rec // create network mn := mocknet.New() defer mn.Close() - mr := mockrouting.NewServer() // Host 1 h1, err := mn.AddPeer(p1.PrivateKey(), p1.Address()) @@ -283,8 +279,7 @@ func prepareNetwork(t *testing.T, ctx context.Context, p1 tnet.Identity, r1 *rec t.Fatal(err) } eh1 := &ErrHost{Host: h1} - routing1 := mr.ClientWithDatastore(context.TODO(), p1, ds.NewMapDatastore()) - bsnet1 := bsnet.NewFromIpfsHost(eh1, routing1) + bsnet1 := bsnet.NewFromIpfsHost(eh1) bsnet1.Start(r1) t.Cleanup(bsnet1.Stop) if r1.listener != nil { @@ -297,8 +292,7 @@ func prepareNetwork(t *testing.T, ctx context.Context, p1 tnet.Identity, r1 *rec t.Fatal(err) } eh2 := &ErrHost{Host: h2} - routing2 := mr.ClientWithDatastore(context.TODO(), p2, ds.NewMapDatastore()) - bsnet2 := bsnet.NewFromIpfsHost(eh2, routing2) + bsnet2 := bsnet.NewFromIpfsHost(eh2) bsnet2.Start(r2) t.Cleanup(bsnet2.Stop) if r2.listener != nil { @@ -310,7 +304,7 @@ func prepareNetwork(t *testing.T, ctx context.Context, p1 tnet.Identity, r1 *rec if err != nil { t.Fatal(err) } - err = bsnet1.ConnectTo(ctx, p2.ID()) + err = bsnet1.Connect(ctx, peer.AddrInfo{ID: p2.ID()}) if err != nil { t.Fatal(err) } @@ -319,7 +313,7 @@ func prepareNetwork(t *testing.T, ctx context.Context, p1 tnet.Identity, r1 *rec t.Fatal("Expected connect event") } - err = bsnet2.ConnectTo(ctx, p1.ID()) + err = bsnet2.Connect(ctx, peer.AddrInfo{ID: p1.ID()}) if err != nil { t.Fatal(err) } @@ -454,8 +448,7 @@ func TestSupportsHave(t *testing.T) { ctx := context.Background() mn := mocknet.New() defer mn.Close() - mr := mockrouting.NewServer() - streamNet, err := tn.StreamNet(ctx, mn, mr) + streamNet, err := tn.StreamNet(ctx, mn) if err != nil { t.Fatalf("Unable to setup network: %s", err) } diff --git a/bitswap/options.go b/bitswap/options.go index 6a98b27db..736b58914 100644 --- a/bitswap/options.go +++ b/bitswap/options.go @@ -43,10 +43,6 @@ func TaskWorkerCount(count int) Option { return Option{server.TaskWorkerCount(count)} } -func ProvideEnabled(enabled bool) Option { - return Option{server.ProvideEnabled(enabled)} -} - func SetSendDontHaves(send bool) Option { return Option{server.SetSendDontHaves(send)} } @@ -106,3 +102,11 @@ func WithTracer(tap tracer.Tracer) Option { }), } } + +func WithClientOption(opt client.Option) Option { + return Option{opt} +} + +func WithServerOption(opt server.Option) Option { + return Option{opt} +} diff --git a/bitswap/server/server.go b/bitswap/server/server.go index 6416da034..5bb277dfb 100644 --- a/bitswap/server/server.go +++ b/bitswap/server/server.go @@ -24,15 +24,11 @@ import ( "go.uber.org/zap" ) -var provideKeysBufferSize = 2048 - var ( log = logging.Logger("bitswap/server") sflog = log.Desugar() ) -const provideWorkerMax = 6 - type Option func(*Server) type Server struct { @@ -62,37 +58,21 @@ type Server struct { // waitWorkers waits for all worker goroutines to exit. waitWorkers sync.WaitGroup - // newBlocks is a channel for newly added blocks to be provided to the - // network. blocks pushed down this channel get buffered and fed to the - // provideKeys channel later on to avoid too much network activity - newBlocks chan cid.Cid - // provideKeys directly feeds provide workers - provideKeys chan cid.Cid - // Extra options to pass to the decision manager engineOptions []decision.Option - - // the size of channel buffer to use - hasBlockBufferSize int - // whether or not to make provide announcements - provideEnabled bool } func New(ctx context.Context, network bsnet.BitSwapNetwork, bstore blockstore.Blockstore, options ...Option) *Server { ctx, cancel := context.WithCancel(ctx) s := &Server{ - sentHistogram: bmetrics.SentHist(ctx), - sendTimeHistogram: bmetrics.SendTimeHist(ctx), - taskWorkerCount: defaults.BitswapTaskWorkerCount, - network: network, - cancel: cancel, - closing: make(chan struct{}), - provideEnabled: true, - hasBlockBufferSize: defaults.HasBlockBufferSize, - provideKeys: make(chan cid.Cid, provideKeysBufferSize), + sentHistogram: bmetrics.SentHist(ctx), + sendTimeHistogram: bmetrics.SendTimeHist(ctx), + taskWorkerCount: defaults.BitswapTaskWorkerCount, + network: network, + cancel: cancel, + closing: make(chan struct{}), } - s.newBlocks = make(chan cid.Cid, s.hasBlockBufferSize) for _, o := range options { o(s) @@ -127,13 +107,6 @@ func WithTracer(tap tracer.Tracer) Option { } } -// ProvideEnabled is an option for enabling/disabling provide announcements -func ProvideEnabled(enabled bool) Option { - return func(bs *Server) { - bs.provideEnabled = enabled - } -} - func WithPeerBlockRequestFilter(pbrf decision.PeerBlockRequestFilter) Option { o := decision.WithPeerBlockRequestFilter(pbrf) return func(bs *Server) { @@ -237,16 +210,6 @@ func MaxCidSize(n uint) Option { } } -// HasBlockBufferSize configure how big the new blocks buffer should be. -func HasBlockBufferSize(count int) Option { - if count < 0 { - panic("cannot have negative buffer size") - } - return func(bs *Server) { - bs.hasBlockBufferSize = count - } -} - // WithWantHaveReplaceSize sets the maximum size of a block in bytes up to // which the bitswap server will replace a WantHave with a WantBlock response. // @@ -296,12 +259,6 @@ func (bs *Server) startWorkers(ctx context.Context) { i := i go bs.taskWorker(ctx, i) } - - if bs.provideEnabled { - bs.waitWorkers.Add(1) - go bs.provideCollector(ctx) - bs.startProvideWorkers(ctx) - } } func (bs *Server) taskWorker(ctx context.Context, id int) { @@ -410,10 +367,9 @@ func (bs *Server) sendBlocks(ctx context.Context, env *decision.Envelope) { } type Stat struct { - Peers []string - ProvideBufLen int - BlocksSent uint64 - DataSent uint64 + Peers []string + BlocksSent uint64 + DataSent uint64 } // Stat returns aggregated statistics about bitswap operations @@ -421,7 +377,6 @@ func (bs *Server) Stat() (Stat, error) { bs.counterLk.Lock() s := bs.counters bs.counterLk.Unlock() - s.ProvideBufLen = len(bs.newBlocks) peers := bs.engine.Peers() peersStr := make([]string, len(peers)) @@ -448,84 +403,9 @@ func (bs *Server) NotifyNewBlocks(ctx context.Context, blks ...blocks.Block) err // Send wanted blocks to decision engine bs.engine.NotifyNewBlocks(blks) - // If the reprovider is enabled, send block to reprovider - if bs.provideEnabled { - for _, blk := range blks { - select { - case bs.newBlocks <- blk.Cid(): - // send block off to be reprovided - case <-bs.closing: - return nil - } - } - } - return nil } -func (bs *Server) provideCollector(ctx context.Context) { - defer bs.waitWorkers.Done() - defer close(bs.provideKeys) - var toProvide []cid.Cid - var nextKey cid.Cid - var keysOut chan cid.Cid - - for { - select { - case blkey, ok := <-bs.newBlocks: - if !ok { - log.Debug("newBlocks channel closed") - return - } - - if keysOut == nil { - nextKey = blkey - keysOut = bs.provideKeys - } else { - toProvide = append(toProvide, blkey) - } - case keysOut <- nextKey: - if len(toProvide) > 0 { - nextKey = toProvide[0] - toProvide = toProvide[1:] - } else { - keysOut = nil - } - case <-ctx.Done(): - return - } - } -} - -// startProvideWorkers starts provide worker goroutines that provide CID -// supplied by provideCollector. -// -// If providing blocks bottlenecks file transfers then consider increasing -// provideWorkerMax, -func (bs *Server) startProvideWorkers(ctx context.Context) { - bs.waitWorkers.Add(provideWorkerMax) - for id := 0; id < provideWorkerMax; id++ { - go func(wid int) { - defer bs.waitWorkers.Done() - - var runCount int - // Read bs.proviudeKeys until closed, when provideCollector exits. - for k := range bs.provideKeys { - runCount++ - log.Debugw("Bitswap provider worker start", "ID", wid, "run", runCount, "cid", k) - - ctx, cancel := context.WithTimeout(ctx, defaults.ProvideTimeout) - if err := bs.network.Provide(ctx, k); err != nil { - log.Warn(err) - } - cancel() - - log.Debugw("Bitswap provider worker done", "ID", wid, "run", runCount, "cid", k) - } - }(id) - } -} - func (bs *Server) ReceiveMessage(ctx context.Context, p peer.ID, incoming message.BitSwapMessage) { // This call records changes to wantlists, blocks received, // and number of bytes transfered. diff --git a/bitswap/testinstance/testinstance.go b/bitswap/testinstance/testinstance.go index 5a052b831..f09831b65 100644 --- a/bitswap/testinstance/testinstance.go +++ b/bitswap/testinstance/testinstance.go @@ -8,6 +8,7 @@ import ( bsnet "github.com/ipfs/boxo/bitswap/network" tn "github.com/ipfs/boxo/bitswap/testnet" blockstore "github.com/ipfs/boxo/blockstore" + mockrouting "github.com/ipfs/boxo/routing/mock" ds "github.com/ipfs/go-datastore" delayed "github.com/ipfs/go-datastore/delayed" ds_sync "github.com/ipfs/go-datastore/sync" @@ -15,11 +16,12 @@ import ( tnet "github.com/libp2p/go-libp2p-testing/net" p2ptestutil "github.com/libp2p/go-libp2p-testing/netutil" peer "github.com/libp2p/go-libp2p/core/peer" + "github.com/libp2p/go-libp2p/core/routing" ) // NewTestInstanceGenerator generates a new InstanceGenerator for the given // testnet -func NewTestInstanceGenerator(net tn.Network, netOptions []bsnet.NetOpt, bsOptions []bitswap.Option) InstanceGenerator { +func NewTestInstanceGenerator(net tn.Network, routing mockrouting.Server, netOptions []bsnet.NetOpt, bsOptions []bitswap.Option) InstanceGenerator { ctx, cancel := context.WithCancel(context.Background()) return InstanceGenerator{ net: net, @@ -28,6 +30,7 @@ func NewTestInstanceGenerator(net tn.Network, netOptions []bsnet.NetOpt, bsOptio cancel: cancel, bsOptions: bsOptions, netOptions: netOptions, + routing: routing, } } @@ -39,6 +42,7 @@ type InstanceGenerator struct { cancel context.CancelFunc bsOptions []bitswap.Option netOptions []bsnet.NetOpt + routing mockrouting.Server } // Close closes the clobal context, shutting down all test instances @@ -54,7 +58,7 @@ func (g *InstanceGenerator) Next() Instance { if err != nil { panic("FIXME") // TODO change signature } - return NewInstance(g.ctx, g.net, p, g.netOptions, g.bsOptions) + return NewInstance(g.ctx, g.net, g.routing.Client(p), p, g.netOptions, g.bsOptions) } // Instances creates N test instances of bitswap + dependencies and connects @@ -74,7 +78,7 @@ func ConnectInstances(instances []Instance) { for i, inst := range instances { for j := i + 1; j < len(instances); j++ { oinst := instances[j] - err := inst.Adapter.ConnectTo(context.Background(), oinst.Peer) + err := inst.Adapter.Connect(context.Background(), peer.AddrInfo{ID: oinst.Identity.ID()}) if err != nil { panic(err.Error()) } @@ -84,18 +88,15 @@ func ConnectInstances(instances []Instance) { // Instance is a test instance of bitswap + dependencies for integration testing type Instance struct { - Peer peer.ID + Identity tnet.Identity + Datastore ds.Batching Exchange *bitswap.Bitswap - blockstore blockstore.Blockstore + Blockstore blockstore.Blockstore Adapter bsnet.BitSwapNetwork + Routing routing.Routing blockstoreDelay delay.D } -// Blockstore returns the block store for this test instance -func (i *Instance) Blockstore() blockstore.Blockstore { - return i.blockstore -} - // SetBlockstoreLatency customizes the artificial delay on receiving blocks // from a blockstore test instance. func (i *Instance) SetBlockstoreLatency(t time.Duration) time.Duration { @@ -107,26 +108,28 @@ func (i *Instance) SetBlockstoreLatency(t time.Duration) time.Duration { // NB: It's easy make mistakes by providing the same peer ID to two different // instances. To safeguard, use the InstanceGenerator to generate instances. It's // just a much better idea. -func NewInstance(ctx context.Context, net tn.Network, p tnet.Identity, netOptions []bsnet.NetOpt, bsOptions []bitswap.Option) Instance { +func NewInstance(ctx context.Context, net tn.Network, router routing.Routing, p tnet.Identity, netOptions []bsnet.NetOpt, bsOptions []bitswap.Option) Instance { bsdelay := delay.Fixed(0) adapter := net.Adapter(p, netOptions...) dstore := ds_sync.MutexWrap(delayed.New(ds.NewMapDatastore(), bsdelay)) + ds := ds_sync.MutexWrap(dstore) bstore, err := blockstore.CachedBlockstore(ctx, - blockstore.NewBlockstore(ds_sync.MutexWrap(dstore)), + blockstore.NewBlockstore(ds), blockstore.DefaultCacheOpts()) if err != nil { panic(err.Error()) // FIXME perhaps change signature and return error. } - bs := bitswap.New(ctx, adapter, bstore, bsOptions...) - + bs := bitswap.New(ctx, adapter, router, bstore, bsOptions...) return Instance{ + Datastore: ds, Adapter: adapter, - Peer: p.ID(), + Identity: p, Exchange: bs, - blockstore: bstore, + Routing: router, + Blockstore: bstore, blockstoreDelay: bsdelay, } } diff --git a/bitswap/testnet/network_test.go b/bitswap/testnet/network_test.go index 0947eff3e..2d45e09b1 100644 --- a/bitswap/testnet/network_test.go +++ b/bitswap/testnet/network_test.go @@ -8,7 +8,6 @@ import ( bsmsg "github.com/ipfs/boxo/bitswap/message" bsnet "github.com/ipfs/boxo/bitswap/network" - mockrouting "github.com/ipfs/boxo/routing/mock" blocks "github.com/ipfs/go-block-format" delay "github.com/ipfs/go-ipfs-delay" @@ -17,7 +16,7 @@ import ( ) func TestSendMessageAsyncButWaitForResponse(t *testing.T) { - net := VirtualNetwork(mockrouting.NewServer(), delay.Fixed(0)) + net := VirtualNetwork(delay.Fixed(0)) responderPeer := tnet.RandIdentityOrFatal(t) waiter := net.Adapter(tnet.RandIdentityOrFatal(t)) responder := net.Adapter(responderPeer) diff --git a/bitswap/testnet/peernet.go b/bitswap/testnet/peernet.go index e4df19699..84fa70c6e 100644 --- a/bitswap/testnet/peernet.go +++ b/bitswap/testnet/peernet.go @@ -5,9 +5,6 @@ import ( bsnet "github.com/ipfs/boxo/bitswap/network" - mockrouting "github.com/ipfs/boxo/routing/mock" - ds "github.com/ipfs/go-datastore" - tnet "github.com/libp2p/go-libp2p-testing/net" "github.com/libp2p/go-libp2p/core/peer" mockpeernet "github.com/libp2p/go-libp2p/p2p/net/mock" @@ -15,12 +12,11 @@ import ( type peernet struct { mockpeernet.Mocknet - routingserver mockrouting.Server } // StreamNet is a testnet that uses libp2p's MockNet -func StreamNet(ctx context.Context, net mockpeernet.Mocknet, rs mockrouting.Server) (Network, error) { - return &peernet{net, rs}, nil +func StreamNet(ctx context.Context, net mockpeernet.Mocknet) (Network, error) { + return &peernet{net}, nil } func (pn *peernet) Adapter(p tnet.Identity, opts ...bsnet.NetOpt) bsnet.BitSwapNetwork { @@ -28,8 +24,8 @@ func (pn *peernet) Adapter(p tnet.Identity, opts ...bsnet.NetOpt) bsnet.BitSwapN if err != nil { panic(err.Error()) } - routing := pn.routingserver.ClientWithDatastore(context.TODO(), p, ds.NewMapDatastore()) - return bsnet.NewFromIpfsHost(client, routing, opts...) + + return bsnet.NewFromIpfsHost(client, opts...) } func (pn *peernet) HasPeer(p peer.ID) bool { diff --git a/bitswap/testnet/virtual.go b/bitswap/testnet/virtual.go index 914044aed..0acf083a9 100644 --- a/bitswap/testnet/virtual.go +++ b/bitswap/testnet/virtual.go @@ -11,27 +11,23 @@ import ( bsmsg "github.com/ipfs/boxo/bitswap/message" bsnet "github.com/ipfs/boxo/bitswap/network" - mockrouting "github.com/ipfs/boxo/routing/mock" - cid "github.com/ipfs/go-cid" delay "github.com/ipfs/go-ipfs-delay" tnet "github.com/libp2p/go-libp2p-testing/net" "github.com/libp2p/go-libp2p/core/connmgr" "github.com/libp2p/go-libp2p/core/peer" protocol "github.com/libp2p/go-libp2p/core/protocol" - "github.com/libp2p/go-libp2p/core/routing" mocknet "github.com/libp2p/go-libp2p/p2p/net/mock" "github.com/libp2p/go-libp2p/p2p/protocol/ping" ) // VirtualNetwork generates a new testnet instance - a fake network that // is used to simulate sending messages. -func VirtualNetwork(rs mockrouting.Server, d delay.D) Network { +func VirtualNetwork(d delay.D) Network { return &network{ latencies: make(map[peer.ID]map[peer.ID]time.Duration), clients: make(map[peer.ID]*receiverQueue), delay: d, - routingserver: rs, isRateLimited: false, rateLimitGenerator: nil, conns: make(map[string]struct{}), @@ -45,13 +41,12 @@ type RateLimitGenerator interface { // RateLimitedVirtualNetwork generates a testnet instance where nodes are rate // limited in the upload/download speed. -func RateLimitedVirtualNetwork(rs mockrouting.Server, d delay.D, rateLimitGenerator RateLimitGenerator) Network { +func RateLimitedVirtualNetwork(d delay.D, rateLimitGenerator RateLimitGenerator) Network { return &network{ latencies: make(map[peer.ID]map[peer.ID]time.Duration), rateLimiters: make(map[peer.ID]map[peer.ID]*mocknet.RateLimiter), clients: make(map[peer.ID]*receiverQueue), delay: d, - routingserver: rs, isRateLimited: true, rateLimitGenerator: rateLimitGenerator, conns: make(map[string]struct{}), @@ -63,7 +58,6 @@ type network struct { latencies map[peer.ID]map[peer.ID]time.Duration rateLimiters map[peer.ID]map[peer.ID]*mocknet.RateLimiter clients map[peer.ID]*receiverQueue - routingserver mockrouting.Server delay delay.D isRateLimited bool rateLimitGenerator RateLimitGenerator @@ -105,7 +99,6 @@ func (n *network) Adapter(p tnet.Identity, opts ...bsnet.NetOpt) bsnet.BitSwapNe client := &networkClient{ local: p.ID(), network: n, - routing: n.routingserver.Client(p), supportedProtocols: s.SupportedProtocols, } n.clients[p.ID()] = &receiverQueue{receiver: client} @@ -192,7 +185,6 @@ type networkClient struct { local peer.ID receivers []bsnet.Receiver network *network - routing routing.Routing supportedProtocols []protocol.ID } @@ -253,27 +245,6 @@ func (nc *networkClient) Stats() bsnet.Stats { } } -// FindProvidersAsync returns a channel of providers for the given key. -func (nc *networkClient) FindProvidersAsync(ctx context.Context, k cid.Cid, max int) <-chan peer.ID { - // NB: this function duplicates the AddrInfo -> ID transformation in the - // bitswap network adapter. Not to worry. This network client will be - // deprecated once the ipfsnet.Mock is added. The code below is only - // temporary. - - out := make(chan peer.ID) - go func() { - defer close(out) - providers := nc.routing.FindProvidersAsync(ctx, k, max) - for info := range providers { - select { - case <-ctx.Done(): - case out <- info.ID: - } - } - }() - return out -} - func (nc *networkClient) ConnectionManager() connmgr.ConnManager { return &connmgr.NullConnMgr{} } @@ -322,11 +293,6 @@ func (nc *networkClient) NewMessageSender(ctx context.Context, p peer.ID, opts * }, nil } -// Provide provides the key to the network. -func (nc *networkClient) Provide(ctx context.Context, k cid.Cid) error { - return nc.routing.Provide(ctx, k, true) -} - func (nc *networkClient) Start(r ...bsnet.Receiver) { nc.receivers = r } @@ -334,15 +300,15 @@ func (nc *networkClient) Start(r ...bsnet.Receiver) { func (nc *networkClient) Stop() { } -func (nc *networkClient) ConnectTo(_ context.Context, p peer.ID) error { +func (nc *networkClient) Connect(_ context.Context, p peer.AddrInfo) error { nc.network.mu.Lock() - otherClient, ok := nc.network.clients[p] + otherClient, ok := nc.network.clients[p.ID] if !ok { nc.network.mu.Unlock() return errors.New("no such peer in network") } - tag := tagForPeers(nc.local, p) + tag := tagForPeers(nc.local, p.ID) if _, ok := nc.network.conns[tag]; ok { nc.network.mu.Unlock() // log.Warning("ALREADY CONNECTED TO PEER (is this a reconnect? test lib needs fixing)") @@ -352,7 +318,7 @@ func (nc *networkClient) ConnectTo(_ context.Context, p peer.ID) error { nc.network.mu.Unlock() otherClient.receiver.PeerConnected(nc.local) - nc.PeerConnected(p) + nc.PeerConnected(p.ID) return nil } diff --git a/blockservice/test/mock.go b/blockservice/test/mock.go index e32b10b99..77eeed127 100644 --- a/blockservice/test/mock.go +++ b/blockservice/test/mock.go @@ -10,14 +10,15 @@ import ( // Mocks returns |n| connected mock Blockservices func Mocks(n int, opts ...blockservice.Option) []blockservice.BlockService { - net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(0)) - sg := testinstance.NewTestInstanceGenerator(net, nil, nil) - + net := tn.VirtualNetwork(delay.Fixed(0)) + routing := mockrouting.NewServer() + sg := testinstance.NewTestInstanceGenerator(net, routing, nil, nil) instances := sg.Instances(n) var servs []blockservice.BlockService for _, i := range instances { - servs = append(servs, blockservice.New(i.Blockstore(), i.Exchange, opts...)) + servs = append(servs, blockservice.New(i.Blockstore, + i.Exchange, opts...)) } return servs } diff --git a/examples/bitswap-transfer/main.go b/examples/bitswap-transfer/main.go index 921dca3fa..fc2d5ded3 100644 --- a/examples/bitswap-transfer/main.go +++ b/examples/bitswap-transfer/main.go @@ -32,7 +32,6 @@ import ( unixfile "github.com/ipfs/boxo/ipld/unixfs/file" "github.com/ipfs/boxo/ipld/unixfs/importer/balanced" uih "github.com/ipfs/boxo/ipld/unixfs/importer/helpers" - routinghelpers "github.com/libp2p/go-libp2p-routing-helpers" bsclient "github.com/ipfs/boxo/bitswap/client" bsnet "github.com/ipfs/boxo/bitswap/network" @@ -178,15 +177,15 @@ func startDataServer(ctx context.Context, h host.Host) (cid.Cid, *bsserver.Serve // Start listening on the Bitswap protocol // For this example we're not leveraging any content routing (DHT, IPNI, delegated routing requests, etc.) as we know the peer we are fetching from - n := bsnet.NewFromIpfsHost(h, routinghelpers.Null{}) + n := bsnet.NewFromIpfsHost(h) bswap := bsserver.New(ctx, n, bs) n.Start(bswap) return nd.Cid(), bswap, nil } func runClient(ctx context.Context, h host.Host, c cid.Cid, targetPeer string) ([]byte, error) { - n := bsnet.NewFromIpfsHost(h, routinghelpers.Null{}) - bswap := bsclient.New(ctx, n, blockstore.NewBlockstore(datastore.NewNullDatastore())) + n := bsnet.NewFromIpfsHost(h) + bswap := bsclient.New(ctx, n, nil, blockstore.NewBlockstore(datastore.NewNullDatastore())) n.Start(bswap) defer bswap.Close() diff --git a/examples/go.mod b/examples/go.mod index ac7a5343e..c7c389626 100644 --- a/examples/go.mod +++ b/examples/go.mod @@ -12,7 +12,6 @@ require ( github.com/ipld/go-car/v2 v2.14.2 github.com/ipld/go-ipld-prime v0.21.0 github.com/libp2p/go-libp2p v0.37.0 - github.com/libp2p/go-libp2p-routing-helpers v0.7.4 github.com/multiformats/go-multiaddr v0.13.0 github.com/multiformats/go-multicodec v0.9.0 github.com/prometheus/client_golang v1.20.5 @@ -95,6 +94,7 @@ require ( github.com/libp2p/go-libp2p-kad-dht v0.27.0 // indirect github.com/libp2p/go-libp2p-kbucket v0.6.4 // indirect github.com/libp2p/go-libp2p-record v0.2.0 // indirect + github.com/libp2p/go-libp2p-routing-helpers v0.7.4 // indirect github.com/libp2p/go-msgio v0.3.0 // indirect github.com/libp2p/go-nat v0.2.0 // indirect github.com/libp2p/go-netroute v0.2.1 // indirect diff --git a/exchange/providing/providing.go b/exchange/providing/providing.go new file mode 100644 index 000000000..6b2887858 --- /dev/null +++ b/exchange/providing/providing.go @@ -0,0 +1,46 @@ +// Package providing implements an exchange wrapper which +// does content providing for new blocks. +package providing + +import ( + "context" + + "github.com/ipfs/boxo/exchange" + "github.com/ipfs/boxo/provider" + blocks "github.com/ipfs/go-block-format" +) + +// Exchange is an exchange wrapper that calls Provide for blocks received +// over NotifyNewBlocks. +type Exchange struct { + exchange.Interface + provider provider.Provider +} + +// New creates a new providing Exchange with the given exchange and provider. +// This is a light wrapper. We recommend that the provider supports the +// handling of many concurrent provides etc. as it is called directly for +// every new block. +func New(base exchange.Interface, provider provider.Provider) *Exchange { + return &Exchange{ + Interface: base, + provider: provider, + } +} + +// NotifyNewBlocks calls NotifyNewBlocks on the underlying provider and +// provider.Provide for every block after that. +func (ex *Exchange) NotifyNewBlocks(ctx context.Context, blocks ...blocks.Block) error { + // Notify blocks on the underlying exchange. + err := ex.Interface.NotifyNewBlocks(ctx, blocks...) + if err != nil { + return err + } + + for _, b := range blocks { + if err := ex.provider.Provide(ctx, b.Cid(), true); err != nil { + return err + } + } + return nil +} diff --git a/exchange/providing/providing_test.go b/exchange/providing/providing_test.go new file mode 100644 index 000000000..42a2a0cb9 --- /dev/null +++ b/exchange/providing/providing_test.go @@ -0,0 +1,74 @@ +package providing + +import ( + "context" + "testing" + "time" + + testinstance "github.com/ipfs/boxo/bitswap/testinstance" + tn "github.com/ipfs/boxo/bitswap/testnet" + "github.com/ipfs/boxo/blockservice" + "github.com/ipfs/boxo/provider" + mockrouting "github.com/ipfs/boxo/routing/mock" + delay "github.com/ipfs/go-ipfs-delay" + "github.com/ipfs/go-test/random" +) + +func TestExchange(t *testing.T) { + ctx := context.Background() + net := tn.VirtualNetwork(delay.Fixed(0)) + routing := mockrouting.NewServer() + sg := testinstance.NewTestInstanceGenerator(net, routing, nil, nil) + i := sg.Next() + provFinder := routing.Client(i.Identity) + prov, err := provider.New(i.Datastore, + provider.Online(provFinder), + ) + if err != nil { + t.Fatal(err) + } + provExchange := New(i.Exchange, prov) + // write-through so that we notify when re-adding block + bs := blockservice.New(i.Blockstore, provExchange, + blockservice.WriteThrough()) + block := random.BlocksOfSize(1, 10)[0] + // put it on the blockstore of the first instance + err = i.Blockstore.Put(ctx, block) + if err != nil { + t.Fatal() + } + + // Trigger reproviding, otherwise it's not really provided. + err = prov.Reprovide(ctx) + if err != nil { + t.Fatal(err) + } + + time.Sleep(200 * time.Millisecond) + + providersChan := provFinder.FindProvidersAsync(ctx, block.Cid(), 1) + _, ok := <-providersChan + if ok { + t.Fatal("there should be no providers yet for block") + } + + // Now add it via BlockService. It should trigger NotifyNewBlocks + // on the exchange and thus they should get announced. + err = bs.AddBlock(ctx, block) + if err != nil { + t.Fatal() + } + // Trigger reproviding, otherwise it's not really provided. + err = prov.Reprovide(ctx) + if err != nil { + t.Fatal(err) + } + + time.Sleep(200 * time.Millisecond) + + providersChan = provFinder.FindProvidersAsync(ctx, block.Cid(), 1) + _, ok = <-providersChan + if !ok { + t.Fatal("there should be one provider for the block") + } +} diff --git a/fetcher/helpers/block_visitor_test.go b/fetcher/helpers/block_visitor_test.go index 57d3e11ad..9ea0eacd9 100644 --- a/fetcher/helpers/block_visitor_test.go +++ b/fetcher/helpers/block_visitor_test.go @@ -44,8 +44,9 @@ func TestFetchGraphToBlocks(t *testing.T) { }) })) - net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(0*time.Millisecond)) - ig := testinstance.NewTestInstanceGenerator(net, nil, nil) + routing := mockrouting.NewServer() + net := tn.VirtualNetwork(delay.Fixed(0 * time.Millisecond)) + ig := testinstance.NewTestInstanceGenerator(net, routing, nil, nil) defer ig.Close() peers := ig.Instances(2) @@ -53,7 +54,7 @@ func TestFetchGraphToBlocks(t *testing.T) { defer hasBlock.Exchange.Close() blocks := []blocks.Block{block1, block2, block3, block4} - err := hasBlock.Blockstore().PutMany(bg, blocks) + err := hasBlock.Blockstore.PutMany(bg, blocks) require.NoError(t, err) err = hasBlock.Exchange.NotifyNewBlocks(bg, blocks...) require.NoError(t, err) @@ -61,7 +62,7 @@ func TestFetchGraphToBlocks(t *testing.T) { wantsBlock := peers[1] defer wantsBlock.Exchange.Close() - wantsGetter := blockservice.New(wantsBlock.Blockstore(), wantsBlock.Exchange) + wantsGetter := blockservice.New(wantsBlock.Blockstore, wantsBlock.Exchange) fetcherConfig := bsfetcher.NewFetcherConfig(wantsGetter) session := fetcherConfig.NewSession(context.Background()) ctx, cancel := context.WithTimeout(context.Background(), time.Second) @@ -94,15 +95,16 @@ func TestFetchGraphToUniqueBlocks(t *testing.T) { }) })) - net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(0*time.Millisecond)) - ig := testinstance.NewTestInstanceGenerator(net, nil, nil) + routing := mockrouting.NewServer() + net := tn.VirtualNetwork(delay.Fixed(0 * time.Millisecond)) + ig := testinstance.NewTestInstanceGenerator(net, routing, nil, nil) defer ig.Close() peers := ig.Instances(2) hasBlock := peers[0] defer hasBlock.Exchange.Close() - err := hasBlock.Blockstore().PutMany(bg, []blocks.Block{block1, block2, block3}) + err := hasBlock.Blockstore.PutMany(bg, []blocks.Block{block1, block2, block3}) require.NoError(t, err) err = hasBlock.Exchange.NotifyNewBlocks(bg, block1, block2, block3) @@ -111,7 +113,7 @@ func TestFetchGraphToUniqueBlocks(t *testing.T) { wantsBlock := peers[1] defer wantsBlock.Exchange.Close() - wantsGetter := blockservice.New(wantsBlock.Blockstore(), wantsBlock.Exchange) + wantsGetter := blockservice.New(wantsBlock.Blockstore, wantsBlock.Exchange) fetcherConfig := bsfetcher.NewFetcherConfig(wantsGetter) session := fetcherConfig.NewSession(context.Background()) ctx, cancel := context.WithTimeout(context.Background(), time.Second) diff --git a/fetcher/impl/blockservice/fetcher_test.go b/fetcher/impl/blockservice/fetcher_test.go index 5a0b071f4..55c1d5c21 100644 --- a/fetcher/impl/blockservice/fetcher_test.go +++ b/fetcher/impl/blockservice/fetcher_test.go @@ -38,15 +38,16 @@ func TestFetchIPLDPrimeNode(t *testing.T) { }) })) - net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(0*time.Millisecond)) - ig := testinstance.NewTestInstanceGenerator(net, nil, nil) + routing := mockrouting.NewServer() + net := tn.VirtualNetwork(delay.Fixed(0 * time.Millisecond)) + ig := testinstance.NewTestInstanceGenerator(net, routing, nil, nil) defer ig.Close() peers := ig.Instances(2) hasBlock := peers[0] defer hasBlock.Exchange.Close() - err := hasBlock.Blockstore().Put(bg, block) + err := hasBlock.Blockstore.Put(bg, block) require.NoError(t, err) err = hasBlock.Exchange.NotifyNewBlocks(bg, block) @@ -55,7 +56,7 @@ func TestFetchIPLDPrimeNode(t *testing.T) { wantsBlock := peers[1] defer wantsBlock.Exchange.Close() - wantsGetter := blockservice.New(wantsBlock.Blockstore(), wantsBlock.Exchange) + wantsGetter := blockservice.New(wantsBlock.Blockstore, wantsBlock.Exchange) fetcherConfig := bsfetcher.NewFetcherConfig(wantsGetter) session := fetcherConfig.NewSession(context.Background()) @@ -87,8 +88,9 @@ func TestFetchIPLDGraph(t *testing.T) { }) })) - net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(0*time.Millisecond)) - ig := testinstance.NewTestInstanceGenerator(net, nil, nil) + routing := mockrouting.NewServer() + net := tn.VirtualNetwork(delay.Fixed(0 * time.Millisecond)) + ig := testinstance.NewTestInstanceGenerator(net, routing, nil, nil) defer ig.Close() peers := ig.Instances(2) @@ -96,7 +98,7 @@ func TestFetchIPLDGraph(t *testing.T) { defer hasBlock.Exchange.Close() blocks := []blocks.Block{block1, block2, block3, block4} - err := hasBlock.Blockstore().PutMany(bg, blocks) + err := hasBlock.Blockstore.PutMany(bg, blocks) require.NoError(t, err) err = hasBlock.Exchange.NotifyNewBlocks(bg, blocks...) require.NoError(t, err) @@ -104,7 +106,7 @@ func TestFetchIPLDGraph(t *testing.T) { wantsBlock := peers[1] defer wantsBlock.Exchange.Close() - wantsGetter := blockservice.New(wantsBlock.Blockstore(), wantsBlock.Exchange) + wantsGetter := blockservice.New(wantsBlock.Blockstore, wantsBlock.Exchange) fetcherConfig := bsfetcher.NewFetcherConfig(wantsGetter) session := fetcherConfig.NewSession(context.Background()) ctx, cancel := context.WithTimeout(context.Background(), time.Second) @@ -143,8 +145,9 @@ func TestFetchIPLDPath(t *testing.T) { }) })) - net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(0*time.Millisecond)) - ig := testinstance.NewTestInstanceGenerator(net, nil, nil) + routing := mockrouting.NewServer() + net := tn.VirtualNetwork(delay.Fixed(0 * time.Millisecond)) + ig := testinstance.NewTestInstanceGenerator(net, routing, nil, nil) defer ig.Close() peers := ig.Instances(2) @@ -152,7 +155,7 @@ func TestFetchIPLDPath(t *testing.T) { defer hasBlock.Exchange.Close() blocks := []blocks.Block{block1, block2, block3, block4, block5} - err := hasBlock.Blockstore().PutMany(bg, blocks) + err := hasBlock.Blockstore.PutMany(bg, blocks) require.NoError(t, err) err = hasBlock.Exchange.NotifyNewBlocks(bg, blocks...) require.NoError(t, err) @@ -160,7 +163,7 @@ func TestFetchIPLDPath(t *testing.T) { wantsBlock := peers[1] defer wantsBlock.Exchange.Close() - wantsGetter := blockservice.New(wantsBlock.Blockstore(), wantsBlock.Exchange) + wantsGetter := blockservice.New(wantsBlock.Blockstore, wantsBlock.Exchange) fetcherConfig := bsfetcher.NewFetcherConfig(wantsGetter) session := fetcherConfig.NewSession(context.Background()) ctx, cancel := context.WithTimeout(context.Background(), time.Second) @@ -206,9 +209,9 @@ func TestHelpers(t *testing.T) { na.AssembleEntry("nonlink").AssignString("zoo") }) })) - - net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(0*time.Millisecond)) - ig := testinstance.NewTestInstanceGenerator(net, nil, nil) + routing := mockrouting.NewServer() + net := tn.VirtualNetwork(delay.Fixed(0 * time.Millisecond)) + ig := testinstance.NewTestInstanceGenerator(net, routing, nil, nil) defer ig.Close() peers := ig.Instances(2) @@ -216,7 +219,7 @@ func TestHelpers(t *testing.T) { defer hasBlock.Exchange.Close() blocks := []blocks.Block{block1, block2, block3, block4} - err := hasBlock.Blockstore().PutMany(bg, blocks) + err := hasBlock.Blockstore.PutMany(bg, blocks) require.NoError(t, err) err = hasBlock.Exchange.NotifyNewBlocks(bg, blocks...) require.NoError(t, err) @@ -224,7 +227,7 @@ func TestHelpers(t *testing.T) { wantsBlock := peers[1] defer wantsBlock.Exchange.Close() - wantsGetter := blockservice.New(wantsBlock.Blockstore(), wantsBlock.Exchange) + wantsGetter := blockservice.New(wantsBlock.Blockstore, wantsBlock.Exchange) t.Run("Block retrieves node", func(t *testing.T) { fetcherConfig := bsfetcher.NewFetcherConfig(wantsGetter) @@ -321,8 +324,9 @@ func TestNodeReification(t *testing.T) { na.AssembleEntry("link4").AssignLink(link4) })) - net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(0*time.Millisecond)) - ig := testinstance.NewTestInstanceGenerator(net, nil, nil) + routing := mockrouting.NewServer() + net := tn.VirtualNetwork(delay.Fixed(0 * time.Millisecond)) + ig := testinstance.NewTestInstanceGenerator(net, routing, nil, nil) defer ig.Close() peers := ig.Instances(2) @@ -330,7 +334,7 @@ func TestNodeReification(t *testing.T) { defer hasBlock.Exchange.Close() blocks := []blocks.Block{block2, block3, block4} - err := hasBlock.Blockstore().PutMany(bg, blocks) + err := hasBlock.Blockstore.PutMany(bg, blocks) require.NoError(t, err) err = hasBlock.Exchange.NotifyNewBlocks(bg, blocks...) require.NoError(t, err) @@ -338,7 +342,7 @@ func TestNodeReification(t *testing.T) { wantsBlock := peers[1] defer wantsBlock.Exchange.Close() - wantsGetter := blockservice.New(wantsBlock.Blockstore(), wantsBlock.Exchange) + wantsGetter := blockservice.New(wantsBlock.Blockstore, wantsBlock.Exchange) fetcherConfig := bsfetcher.NewFetcherConfig(wantsGetter) nodeReifier := func(lnkCtx ipld.LinkContext, nd ipld.Node, ls *ipld.LinkSystem) (ipld.Node, error) { return &selfLoader{Node: nd, ctx: lnkCtx.Ctx, ls: ls}, nil diff --git a/provider/noop.go b/provider/noop.go index 5367ccb30..50c3e3502 100644 --- a/provider/noop.go +++ b/provider/noop.go @@ -19,7 +19,7 @@ func (op *noopProvider) Close() error { return nil } -func (op *noopProvider) Provide(cid.Cid) error { +func (op *noopProvider) Provide(context.Context, cid.Cid, bool) error { return nil } diff --git a/provider/provider.go b/provider/provider.go index a20a805cb..4197f3dae 100644 --- a/provider/provider.go +++ b/provider/provider.go @@ -18,7 +18,7 @@ var logR = logging.Logger("reprovider.simple") // Provider announces blocks to the network type Provider interface { // Provide takes a cid and makes an attempt to announce it to the network - Provide(cid.Cid) error + Provide(context.Context, cid.Cid, bool) error } // Reprovider reannounces blocks to the network diff --git a/provider/reprovider.go b/provider/reprovider.go index 219bacc75..048a2067d 100644 --- a/provider/reprovider.go +++ b/provider/reprovider.go @@ -455,7 +455,7 @@ func (s *reprovider) Close() error { return err } -func (s *reprovider) Provide(cid cid.Cid) error { +func (s *reprovider) Provide(ctx context.Context, cid cid.Cid, announce bool) error { return s.q.Enqueue(cid) } diff --git a/provider/reprovider_test.go b/provider/reprovider_test.go index 4ae58148e..ceb72f97b 100644 --- a/provider/reprovider_test.go +++ b/provider/reprovider_test.go @@ -198,7 +198,7 @@ func TestOfflineRecordsThenOnlineRepublish(t *testing.T) { sys, err := New(ds) assert.NoError(t, err) - err = sys.Provide(c) + err = sys.Provide(context.Background(), c, true) assert.NoError(t, err) err = sys.Close() diff --git a/routing/mock/centralized_client.go b/routing/mock/centralized_client.go index 02c68d100..2c2135bb8 100644 --- a/routing/mock/centralized_client.go +++ b/routing/mock/centralized_client.go @@ -47,11 +47,12 @@ func (c *client) FindPeer(ctx context.Context, pid peer.ID) (peer.AddrInfo, erro } func (c *client) FindProvidersAsync(ctx context.Context, k cid.Cid, max int) <-chan peer.AddrInfo { + log.Debugf("FindProvidersAsync: %s %d", k, max) out := make(chan peer.AddrInfo) go func() { defer close(out) for i, p := range c.server.Providers(k) { - if max <= i { + if max > 0 && max <= i { return } select { diff --git a/routing/mock/centralized_server.go b/routing/mock/centralized_server.go index d55de7081..85c768814 100644 --- a/routing/mock/centralized_server.go +++ b/routing/mock/centralized_server.go @@ -39,7 +39,7 @@ func (rs *s) Announce(p peer.AddrInfo, c cid.Cid) error { rs.lock.Lock() defer rs.lock.Unlock() - k := c.KeyString() + k := c.Hash().String() _, ok := rs.providers[k] if !ok { @@ -54,16 +54,16 @@ func (rs *s) Announce(p peer.AddrInfo, c cid.Cid) error { func (rs *s) Providers(c cid.Cid) []peer.AddrInfo { rs.delayConf.Query.Wait() // before locking - rs.lock.RLock() defer rs.lock.RUnlock() - k := c.KeyString() + k := c.Hash().String() var ret []peer.AddrInfo records, ok := rs.providers[k] if !ok { return ret } + for _, r := range records { if time.Since(r.Created) > rs.delayConf.ValueVisibility.Get() { ret = append(ret, r.Peer) @@ -74,7 +74,6 @@ func (rs *s) Providers(c cid.Cid) []peer.AddrInfo { j := rand.Intn(i + 1) ret[i], ret[j] = ret[j], ret[i] } - return ret } diff --git a/bitswap/client/internal/providerquerymanager/providerquerymanager.go b/routing/providerquerymanager/providerquerymanager.go similarity index 69% rename from bitswap/client/internal/providerquerymanager/providerquerymanager.go rename to routing/providerquerymanager/providerquerymanager.go index c85efe737..d9005020e 100644 --- a/bitswap/client/internal/providerquerymanager/providerquerymanager.go +++ b/routing/providerquerymanager/providerquerymanager.go @@ -5,27 +5,28 @@ import ( "sync" "time" - "github.com/ipfs/boxo/bitswap/client/internal" "github.com/ipfs/go-cid" logging "github.com/ipfs/go-log/v2" peer "github.com/libp2p/go-libp2p/core/peer" + swarm "github.com/libp2p/go-libp2p/p2p/net/swarm" + "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/trace" ) -var log = logging.Logger("bitswap/client/provqrymgr") +var log = logging.Logger("routing/provqrymgr") const ( - maxProviders = 10 - maxInProcessRequests = 6 - defaultTimeout = 10 * time.Second + defaultMaxInProcessRequests = 6 + defaultMaxProviders = 0 + defaultTimeout = 10 * time.Second ) type inProgressRequestStatus struct { ctx context.Context cancelFn func() - providersSoFar []peer.ID - listeners map[chan peer.ID]struct{} + providersSoFar []peer.AddrInfo + listeners map[chan peer.AddrInfo]struct{} } type findProviderRequest struct { @@ -33,11 +34,16 @@ type findProviderRequest struct { ctx context.Context } -// ProviderQueryNetwork is an interface for finding providers and connecting to -// peers. -type ProviderQueryNetwork interface { - ConnectTo(context.Context, peer.ID) error - FindProvidersAsync(context.Context, cid.Cid, int) <-chan peer.ID +// ProviderQueryDialer is an interface for connecting to peers. Usually a +// libp2p.Host +type ProviderQueryDialer interface { + Connect(context.Context, peer.AddrInfo) error +} + +// ProviderQueryRouter is an interface for finding providers. Usually a libp2p +// ContentRouter. +type ProviderQueryRouter interface { + FindProvidersAsync(context.Context, cid.Cid, int) <-chan peer.AddrInfo } type providerQueryMessage interface { @@ -48,7 +54,7 @@ type providerQueryMessage interface { type receivedProviderMessage struct { ctx context.Context k cid.Cid - p peer.ID + p peer.AddrInfo } type finishedProviderQueryMessage struct { @@ -64,7 +70,7 @@ type newProvideQueryMessage struct { type cancelRequestMessage struct { ctx context.Context - incomingProviders chan peer.ID + incomingProviders chan peer.AddrInfo k cid.Cid } @@ -77,7 +83,8 @@ type cancelRequestMessage struct { // - manage timeouts type ProviderQueryManager struct { ctx context.Context - network ProviderQueryNetwork + dialer ProviderQueryDialer + router ProviderQueryRouter providerQueryMessages chan providerQueryMessage providerRequestsProcessing chan *findProviderRequest incomingFindProviderRequests chan *findProviderRequest @@ -85,22 +92,63 @@ type ProviderQueryManager struct { findProviderTimeout time.Duration timeoutMutex sync.RWMutex + maxProviders int + maxInProcessRequests int + // do not touch outside the run loop inProgressRequestStatuses map[cid.Cid]*inProgressRequestStatus } +type Option func(*ProviderQueryManager) error + +func WithMaxTimeout(timeout time.Duration) Option { + return func(mgr *ProviderQueryManager) error { + mgr.findProviderTimeout = timeout + return nil + } +} + +// WithMaxInProcessRequests is the maximum number of requests that can be processed in parallel +func WithMaxInProcessRequests(count int) Option { + return func(mgr *ProviderQueryManager) error { + mgr.maxInProcessRequests = count + return nil + } +} + +// WithMaxProviders is the maximum number of providers that will be looked up +// per query. We only return providers that we can connect to. Defaults to 0, +// which means unbounded. +func WithMaxProviders(count int) Option { + return func(mgr *ProviderQueryManager) error { + mgr.maxProviders = count + return nil + } +} + // New initializes a new ProviderQueryManager for a given context and a given // network provider. -func New(ctx context.Context, network ProviderQueryNetwork) *ProviderQueryManager { - return &ProviderQueryManager{ +func New(ctx context.Context, dialer ProviderQueryDialer, router ProviderQueryRouter, opts ...Option) (*ProviderQueryManager, error) { + pqm := &ProviderQueryManager{ ctx: ctx, - network: network, + dialer: dialer, + router: router, providerQueryMessages: make(chan providerQueryMessage, 16), providerRequestsProcessing: make(chan *findProviderRequest), incomingFindProviderRequests: make(chan *findProviderRequest), inProgressRequestStatuses: make(map[cid.Cid]*inProgressRequestStatus), findProviderTimeout: defaultTimeout, + maxInProcessRequests: defaultMaxInProcessRequests, + maxProviders: defaultMaxProviders, + } + + for _, o := range opts { + if err := o(pqm); err != nil { + return nil, err + } } + + return pqm, nil } // Startup starts processing for the ProviderQueryManager. @@ -109,23 +157,30 @@ func (pqm *ProviderQueryManager) Startup() { } type inProgressRequest struct { - providersSoFar []peer.ID - incoming chan peer.ID + providersSoFar []peer.AddrInfo + incoming chan peer.AddrInfo } -// SetFindProviderTimeout changes the timeout for finding providers -func (pqm *ProviderQueryManager) SetFindProviderTimeout(findProviderTimeout time.Duration) { +// setFindProviderTimeout changes the timeout for finding providers +func (pqm *ProviderQueryManager) setFindProviderTimeout(findProviderTimeout time.Duration) { pqm.timeoutMutex.Lock() pqm.findProviderTimeout = findProviderTimeout pqm.timeoutMutex.Unlock() } -// FindProvidersAsync finds providers for the given block. -func (pqm *ProviderQueryManager) FindProvidersAsync(sessionCtx context.Context, k cid.Cid) <-chan peer.ID { +// FindProvidersAsync finds providers for the given block. The max parameter +// controls how many will be returned at most. For a provider to be returned, +// we must have successfully connected to it. Setting max to 0 will use the +// configured MaxProviders which defaults to 0 (unbounded). +func (pqm *ProviderQueryManager) FindProvidersAsync(sessionCtx context.Context, k cid.Cid, max int) <-chan peer.AddrInfo { + if max == 0 { + max = pqm.maxProviders + } + inProgressRequestChan := make(chan inProgressRequest) var span trace.Span - sessionCtx, span = internal.StartSpan(sessionCtx, "ProviderQueryManager.FindProvidersAsync", trace.WithAttributes(attribute.Stringer("cid", k))) + sessionCtx, span = otel.Tracer("routing").Start(sessionCtx, "ProviderQueryManager.FindProvidersAsync", trace.WithAttributes(attribute.Stringer("cid", k))) select { case pqm.providerQueryMessages <- &newProvideQueryMessage{ @@ -134,12 +189,12 @@ func (pqm *ProviderQueryManager) FindProvidersAsync(sessionCtx context.Context, inProgressRequestChan: inProgressRequestChan, }: case <-pqm.ctx.Done(): - ch := make(chan peer.ID) + ch := make(chan peer.AddrInfo) close(ch) span.End() return ch case <-sessionCtx.Done(): - ch := make(chan peer.ID) + ch := make(chan peer.AddrInfo) close(ch) return ch } @@ -150,41 +205,59 @@ func (pqm *ProviderQueryManager) FindProvidersAsync(sessionCtx context.Context, var receivedInProgressRequest inProgressRequest select { case <-pqm.ctx.Done(): - ch := make(chan peer.ID) + ch := make(chan peer.AddrInfo) close(ch) span.End() return ch case receivedInProgressRequest = <-inProgressRequestChan: } - return pqm.receiveProviders(sessionCtx, k, receivedInProgressRequest, func() { span.End() }) + return pqm.receiveProviders(sessionCtx, k, max, receivedInProgressRequest, func() { span.End() }) } -func (pqm *ProviderQueryManager) receiveProviders(sessionCtx context.Context, k cid.Cid, receivedInProgressRequest inProgressRequest, onCloseFn func()) <-chan peer.ID { +func (pqm *ProviderQueryManager) receiveProviders(sessionCtx context.Context, k cid.Cid, max int, receivedInProgressRequest inProgressRequest, onCloseFn func()) <-chan peer.AddrInfo { // maintains an unbuffered queue for incoming providers for given request for a given session // essentially, as a provider comes in, for a given CID, we want to immediately broadcast to all // sessions that queried that CID, without worrying about whether the client code is actually // reading from the returned channel -- so that the broadcast never blocks // based on: https://medium.com/capital-one-tech/building-an-unbounded-channel-in-go-789e175cd2cd - returnedProviders := make(chan peer.ID) - receivedProviders := append([]peer.ID(nil), receivedInProgressRequest.providersSoFar[0:]...) + returnedProviders := make(chan peer.AddrInfo) + receivedProviders := append([]peer.AddrInfo(nil), receivedInProgressRequest.providersSoFar[0:]...) incomingProviders := receivedInProgressRequest.incoming + // count how many providers we received from our workers etc. + // these providers should be peers we managed to connect to. + total := len(receivedProviders) go func() { defer close(returnedProviders) defer onCloseFn() - outgoingProviders := func() chan<- peer.ID { + outgoingProviders := func() chan<- peer.AddrInfo { if len(receivedProviders) == 0 { return nil } return returnedProviders } - nextProvider := func() peer.ID { + nextProvider := func() peer.AddrInfo { if len(receivedProviders) == 0 { - return "" + return peer.AddrInfo{} } return receivedProviders[0] } + + stopWhenMaxReached := func() { + if max > 0 && total >= max { + if incomingProviders != nil { + // drains incomingProviders. + pqm.cancelProviderRequest(sessionCtx, k, incomingProviders) + incomingProviders = nil + } + } + } + + // Handle the case when providersSoFar already is more than we + // need. + stopWhenMaxReached() + for len(receivedProviders) > 0 || incomingProviders != nil { select { case <-pqm.ctx.Done(): @@ -199,6 +272,13 @@ func (pqm *ProviderQueryManager) receiveProviders(sessionCtx context.Context, k incomingProviders = nil } else { receivedProviders = append(receivedProviders, provider) + total++ + stopWhenMaxReached() + // we do not return, we will loop on + // the case below until + // len(receivedProviders) == 0, which + // means they have all been sent out + // via returnedProviders } case outgoingProviders() <- nextProvider(): receivedProviders = receivedProviders[1:] @@ -208,7 +288,7 @@ func (pqm *ProviderQueryManager) receiveProviders(sessionCtx context.Context, k return returnedProviders } -func (pqm *ProviderQueryManager) cancelProviderRequest(ctx context.Context, k cid.Cid, incomingProviders chan peer.ID) { +func (pqm *ProviderQueryManager) cancelProviderRequest(ctx context.Context, k cid.Cid, incomingProviders chan peer.AddrInfo) { cancelMessageChannel := pqm.providerQueryMessages for { select { @@ -247,20 +327,24 @@ func (pqm *ProviderQueryManager) findProviderWorker() { pqm.timeoutMutex.RUnlock() span := trace.SpanFromContext(findProviderCtx) span.AddEvent("StartFindProvidersAsync") - providers := pqm.network.FindProvidersAsync(findProviderCtx, k, maxProviders) + // We set count == 0. We will cancel the query + // manually once we have enough. This assumes the + // ContentDiscovery implementation does that, which a + // requirement per the libp2p/core/routing interface. + providers := pqm.router.FindProvidersAsync(findProviderCtx, k, 0) wg := &sync.WaitGroup{} for p := range providers { wg.Add(1) - go func(p peer.ID) { + go func(p peer.AddrInfo) { defer wg.Done() - span.AddEvent("FoundProvider", trace.WithAttributes(attribute.Stringer("peer", p))) - err := pqm.network.ConnectTo(findProviderCtx, p) - if err != nil { - span.RecordError(err, trace.WithAttributes(attribute.Stringer("peer", p))) - log.Debugf("failed to connect to provider %s: %s", p, err) + span.AddEvent("FoundProvider", trace.WithAttributes(attribute.Stringer("peer", p.ID))) + err := pqm.dialer.Connect(findProviderCtx, p) + if err != nil && err != swarm.ErrDialToSelf { + span.RecordError(err, trace.WithAttributes(attribute.Stringer("peer", p.ID))) + log.Debugf("failed to connect to provider %s: %s", p.ID, err) return } - span.AddEvent("ConnectedToProvider", trace.WithAttributes(attribute.Stringer("peer", p))) + span.AddEvent("ConnectedToProvider", trace.WithAttributes(attribute.Stringer("peer", p.ID))) select { case pqm.providerQueryMessages <- &receivedProviderMessage{ ctx: fpr.ctx, @@ -334,7 +418,7 @@ func (pqm *ProviderQueryManager) run() { defer pqm.cleanupInProcessRequests() go pqm.providerRequestBufferWorker() - for i := 0; i < maxInProcessRequests; i++ { + for i := 0; i < pqm.maxInProcessRequests; i++ { go pqm.findProviderWorker() } @@ -403,7 +487,7 @@ func (npqm *newProvideQueryMessage) handle(pqm *ProviderQueryManager) { ctx = trace.ContextWithSpan(ctx, span) requestStatus = &inProgressRequestStatus{ - listeners: make(map[chan peer.ID]struct{}), + listeners: make(map[chan peer.AddrInfo]struct{}), ctx: ctx, cancelFn: cancelFn, } @@ -421,7 +505,7 @@ func (npqm *newProvideQueryMessage) handle(pqm *ProviderQueryManager) { } else { trace.SpanFromContext(npqm.ctx).AddEvent("JoinQuery", trace.WithAttributes(attribute.Stringer("cid", npqm.k))) } - inProgressChan := make(chan peer.ID) + inProgressChan := make(chan peer.AddrInfo) requestStatus.listeners[inProgressChan] = struct{}{} select { case npqm.inProgressRequestChan <- inProgressRequest{ diff --git a/bitswap/client/internal/providerquerymanager/providerquerymanager_test.go b/routing/providerquerymanager/providerquerymanager_test.go similarity index 69% rename from bitswap/client/internal/providerquerymanager/providerquerymanager_test.go rename to routing/providerquerymanager/providerquerymanager_test.go index 9deb77f99..b55c1debc 100644 --- a/bitswap/client/internal/providerquerymanager/providerquerymanager_test.go +++ b/routing/providerquerymanager/providerquerymanager_test.go @@ -13,27 +13,30 @@ import ( "github.com/libp2p/go-libp2p/core/peer" ) -type fakeProviderNetwork struct { +type fakeProviderDialer struct { + connectError error + connectDelay time.Duration +} + +type fakeProviderDiscovery struct { peersFound []peer.ID - connectError error delay time.Duration - connectDelay time.Duration queriesMadeMutex sync.RWMutex queriesMade int liveQueries int } -func (fpn *fakeProviderNetwork) ConnectTo(context.Context, peer.ID) error { - time.Sleep(fpn.connectDelay) - return fpn.connectError +func (fpd *fakeProviderDialer) Connect(context.Context, peer.AddrInfo) error { + time.Sleep(fpd.connectDelay) + return fpd.connectError } -func (fpn *fakeProviderNetwork) FindProvidersAsync(ctx context.Context, k cid.Cid, max int) <-chan peer.ID { +func (fpn *fakeProviderDiscovery) FindProvidersAsync(ctx context.Context, k cid.Cid, max int) <-chan peer.AddrInfo { fpn.queriesMadeMutex.Lock() fpn.queriesMade++ fpn.liveQueries++ fpn.queriesMadeMutex.Unlock() - incomingPeers := make(chan peer.ID) + incomingPeers := make(chan peer.AddrInfo) go func() { defer close(incomingPeers) for _, p := range fpn.peersFound { @@ -44,7 +47,7 @@ func (fpn *fakeProviderNetwork) FindProvidersAsync(ctx context.Context, k cid.Ci default: } select { - case incomingPeers <- p: + case incomingPeers <- peer.AddrInfo{ID: p}: case <-ctx.Done(): return } @@ -57,28 +60,36 @@ func (fpn *fakeProviderNetwork) FindProvidersAsync(ctx context.Context, k cid.Ci return incomingPeers } +func mustNotErr[T any](out T, err error) T { + if err != nil { + panic(err) + } + return out +} + func TestNormalSimultaneousFetch(t *testing.T) { peers := random.Peers(10) - fpn := &fakeProviderNetwork{ + fpd := &fakeProviderDialer{} + fpn := &fakeProviderDiscovery{ peersFound: peers, delay: 1 * time.Millisecond, } ctx := context.Background() - providerQueryManager := New(ctx, fpn) + providerQueryManager := mustNotErr(New(ctx, fpd, fpn)) providerQueryManager.Startup() keys := random.Cids(2) sessionCtx, cancel := context.WithTimeout(ctx, 5*time.Second) defer cancel() - firstRequestChan := providerQueryManager.FindProvidersAsync(sessionCtx, keys[0]) - secondRequestChan := providerQueryManager.FindProvidersAsync(sessionCtx, keys[1]) + firstRequestChan := providerQueryManager.FindProvidersAsync(sessionCtx, keys[0], 0) + secondRequestChan := providerQueryManager.FindProvidersAsync(sessionCtx, keys[1], 0) - var firstPeersReceived []peer.ID + var firstPeersReceived []peer.AddrInfo for p := range firstRequestChan { firstPeersReceived = append(firstPeersReceived, p) } - var secondPeersReceived []peer.ID + var secondPeersReceived []peer.AddrInfo for p := range secondRequestChan { secondPeersReceived = append(secondPeersReceived, p) } @@ -96,26 +107,27 @@ func TestNormalSimultaneousFetch(t *testing.T) { func TestDedupingProviderRequests(t *testing.T) { peers := random.Peers(10) - fpn := &fakeProviderNetwork{ + fpd := &fakeProviderDialer{} + fpn := &fakeProviderDiscovery{ peersFound: peers, delay: 1 * time.Millisecond, } ctx := context.Background() - providerQueryManager := New(ctx, fpn) + providerQueryManager := mustNotErr(New(ctx, fpd, fpn)) providerQueryManager.Startup() key := random.Cids(1)[0] sessionCtx, cancel := context.WithTimeout(ctx, 5*time.Second) defer cancel() - firstRequestChan := providerQueryManager.FindProvidersAsync(sessionCtx, key) - secondRequestChan := providerQueryManager.FindProvidersAsync(sessionCtx, key) + firstRequestChan := providerQueryManager.FindProvidersAsync(sessionCtx, key, 0) + secondRequestChan := providerQueryManager.FindProvidersAsync(sessionCtx, key, 0) - var firstPeersReceived []peer.ID + var firstPeersReceived []peer.AddrInfo for p := range firstRequestChan { firstPeersReceived = append(firstPeersReceived, p) } - var secondPeersReceived []peer.ID + var secondPeersReceived []peer.AddrInfo for p := range secondRequestChan { secondPeersReceived = append(secondPeersReceived, p) } @@ -136,12 +148,13 @@ func TestDedupingProviderRequests(t *testing.T) { func TestCancelOneRequestDoesNotTerminateAnother(t *testing.T) { peers := random.Peers(10) - fpn := &fakeProviderNetwork{ + fpd := &fakeProviderDialer{} + fpn := &fakeProviderDiscovery{ peersFound: peers, delay: 1 * time.Millisecond, } ctx := context.Background() - providerQueryManager := New(ctx, fpn) + providerQueryManager := mustNotErr(New(ctx, fpd, fpn)) providerQueryManager.Startup() key := random.Cids(1)[0] @@ -149,17 +162,17 @@ func TestCancelOneRequestDoesNotTerminateAnother(t *testing.T) { // first session will cancel before done firstSessionCtx, firstCancel := context.WithTimeout(ctx, 3*time.Millisecond) defer firstCancel() - firstRequestChan := providerQueryManager.FindProvidersAsync(firstSessionCtx, key) + firstRequestChan := providerQueryManager.FindProvidersAsync(firstSessionCtx, key, 0) secondSessionCtx, secondCancel := context.WithTimeout(ctx, 5*time.Second) defer secondCancel() - secondRequestChan := providerQueryManager.FindProvidersAsync(secondSessionCtx, key) + secondRequestChan := providerQueryManager.FindProvidersAsync(secondSessionCtx, key, 0) - var firstPeersReceived []peer.ID + var firstPeersReceived []peer.AddrInfo for p := range firstRequestChan { firstPeersReceived = append(firstPeersReceived, p) } - var secondPeersReceived []peer.ID + var secondPeersReceived []peer.AddrInfo for p := range secondRequestChan { secondPeersReceived = append(secondPeersReceived, p) } @@ -180,29 +193,30 @@ func TestCancelOneRequestDoesNotTerminateAnother(t *testing.T) { func TestCancelManagerExitsGracefully(t *testing.T) { peers := random.Peers(10) - fpn := &fakeProviderNetwork{ + fpd := &fakeProviderDialer{} + fpn := &fakeProviderDiscovery{ peersFound: peers, delay: 1 * time.Millisecond, } ctx := context.Background() managerCtx, managerCancel := context.WithTimeout(ctx, 5*time.Millisecond) defer managerCancel() - providerQueryManager := New(managerCtx, fpn) + providerQueryManager := mustNotErr(New(managerCtx, fpd, fpn)) providerQueryManager.Startup() key := random.Cids(1)[0] sessionCtx, cancel := context.WithTimeout(ctx, 20*time.Millisecond) defer cancel() - firstRequestChan := providerQueryManager.FindProvidersAsync(sessionCtx, key) - secondRequestChan := providerQueryManager.FindProvidersAsync(sessionCtx, key) + firstRequestChan := providerQueryManager.FindProvidersAsync(sessionCtx, key, 0) + secondRequestChan := providerQueryManager.FindProvidersAsync(sessionCtx, key, 0) - var firstPeersReceived []peer.ID + var firstPeersReceived []peer.AddrInfo for p := range firstRequestChan { firstPeersReceived = append(firstPeersReceived, p) } - var secondPeersReceived []peer.ID + var secondPeersReceived []peer.AddrInfo for p := range secondRequestChan { secondPeersReceived = append(secondPeersReceived, p) } @@ -215,28 +229,30 @@ func TestCancelManagerExitsGracefully(t *testing.T) { func TestPeersWithConnectionErrorsNotAddedToPeerList(t *testing.T) { peers := random.Peers(10) - fpn := &fakeProviderNetwork{ - peersFound: peers, + fpd := &fakeProviderDialer{ connectError: errors.New("not able to connect"), - delay: 1 * time.Millisecond, + } + fpn := &fakeProviderDiscovery{ + peersFound: peers, + delay: 1 * time.Millisecond, } ctx := context.Background() - providerQueryManager := New(ctx, fpn) + providerQueryManager := mustNotErr(New(ctx, fpd, fpn)) providerQueryManager.Startup() key := random.Cids(1)[0] sessionCtx, cancel := context.WithTimeout(ctx, 20*time.Millisecond) defer cancel() - firstRequestChan := providerQueryManager.FindProvidersAsync(sessionCtx, key) - secondRequestChan := providerQueryManager.FindProvidersAsync(sessionCtx, key) + firstRequestChan := providerQueryManager.FindProvidersAsync(sessionCtx, key, 0) + secondRequestChan := providerQueryManager.FindProvidersAsync(sessionCtx, key, 0) - var firstPeersReceived []peer.ID + var firstPeersReceived []peer.AddrInfo for p := range firstRequestChan { firstPeersReceived = append(firstPeersReceived, p) } - var secondPeersReceived []peer.ID + var secondPeersReceived []peer.AddrInfo for p := range secondRequestChan { secondPeersReceived = append(secondPeersReceived, p) } @@ -248,38 +264,39 @@ func TestPeersWithConnectionErrorsNotAddedToPeerList(t *testing.T) { func TestRateLimitingRequests(t *testing.T) { peers := random.Peers(10) - fpn := &fakeProviderNetwork{ + fpd := &fakeProviderDialer{} + fpn := &fakeProviderDiscovery{ peersFound: peers, delay: 5 * time.Millisecond, } ctx := context.Background() ctx, cancel := context.WithCancel(ctx) defer cancel() - providerQueryManager := New(ctx, fpn) + providerQueryManager := mustNotErr(New(ctx, fpd, fpn)) providerQueryManager.Startup() - keys := random.Cids(maxInProcessRequests + 1) + keys := random.Cids(providerQueryManager.maxInProcessRequests + 1) sessionCtx, cancel := context.WithTimeout(ctx, 5*time.Second) defer cancel() - var requestChannels []<-chan peer.ID - for i := 0; i < maxInProcessRequests+1; i++ { - requestChannels = append(requestChannels, providerQueryManager.FindProvidersAsync(sessionCtx, keys[i])) + var requestChannels []<-chan peer.AddrInfo + for i := 0; i < providerQueryManager.maxInProcessRequests+1; i++ { + requestChannels = append(requestChannels, providerQueryManager.FindProvidersAsync(sessionCtx, keys[i], 0)) } time.Sleep(20 * time.Millisecond) fpn.queriesMadeMutex.Lock() - if fpn.liveQueries != maxInProcessRequests { + if fpn.liveQueries != providerQueryManager.maxInProcessRequests { t.Logf("Queries made: %d\n", fpn.liveQueries) t.Fatal("Did not limit parallel requests to rate limit") } fpn.queriesMadeMutex.Unlock() - for i := 0; i < maxInProcessRequests+1; i++ { + for i := 0; i < providerQueryManager.maxInProcessRequests+1; i++ { for range requestChannels[i] { } } fpn.queriesMadeMutex.Lock() defer fpn.queriesMadeMutex.Unlock() - if fpn.queriesMade != maxInProcessRequests+1 { + if fpn.queriesMade != providerQueryManager.maxInProcessRequests+1 { t.Logf("Queries made: %d\n", fpn.queriesMade) t.Fatal("Did not make all separate requests") } @@ -287,20 +304,21 @@ func TestRateLimitingRequests(t *testing.T) { func TestFindProviderTimeout(t *testing.T) { peers := random.Peers(10) - fpn := &fakeProviderNetwork{ + fpd := &fakeProviderDialer{} + fpn := &fakeProviderDiscovery{ peersFound: peers, delay: 10 * time.Millisecond, } ctx := context.Background() - providerQueryManager := New(ctx, fpn) + providerQueryManager := mustNotErr(New(ctx, fpd, fpn)) providerQueryManager.Startup() - providerQueryManager.SetFindProviderTimeout(2 * time.Millisecond) + providerQueryManager.setFindProviderTimeout(2 * time.Millisecond) keys := random.Cids(1) sessionCtx, cancel := context.WithTimeout(ctx, 5*time.Second) defer cancel() - firstRequestChan := providerQueryManager.FindProvidersAsync(sessionCtx, keys[0]) - var firstPeersReceived []peer.ID + firstRequestChan := providerQueryManager.FindProvidersAsync(sessionCtx, keys[0], 0) + var firstPeersReceived []peer.AddrInfo for p := range firstRequestChan { firstPeersReceived = append(firstPeersReceived, p) } @@ -311,19 +329,20 @@ func TestFindProviderTimeout(t *testing.T) { func TestFindProviderPreCanceled(t *testing.T) { peers := random.Peers(10) - fpn := &fakeProviderNetwork{ + fpd := &fakeProviderDialer{} + fpn := &fakeProviderDiscovery{ peersFound: peers, delay: 1 * time.Millisecond, } ctx := context.Background() - providerQueryManager := New(ctx, fpn) + providerQueryManager := mustNotErr(New(ctx, fpd, fpn)) providerQueryManager.Startup() - providerQueryManager.SetFindProviderTimeout(100 * time.Millisecond) + providerQueryManager.setFindProviderTimeout(100 * time.Millisecond) keys := random.Cids(1) sessionCtx, cancel := context.WithCancel(ctx) cancel() - firstRequestChan := providerQueryManager.FindProvidersAsync(sessionCtx, keys[0]) + firstRequestChan := providerQueryManager.FindProvidersAsync(sessionCtx, keys[0], 0) if firstRequestChan == nil { t.Fatal("expected non-nil channel") } @@ -336,18 +355,19 @@ func TestFindProviderPreCanceled(t *testing.T) { func TestCancelFindProvidersAfterCompletion(t *testing.T) { peers := random.Peers(2) - fpn := &fakeProviderNetwork{ + fpd := &fakeProviderDialer{} + fpn := &fakeProviderDiscovery{ peersFound: peers, delay: 1 * time.Millisecond, } ctx := context.Background() - providerQueryManager := New(ctx, fpn) + providerQueryManager := mustNotErr(New(ctx, fpd, fpn)) providerQueryManager.Startup() - providerQueryManager.SetFindProviderTimeout(100 * time.Millisecond) + providerQueryManager.setFindProviderTimeout(100 * time.Millisecond) keys := random.Cids(1) sessionCtx, cancel := context.WithCancel(ctx) - firstRequestChan := providerQueryManager.FindProvidersAsync(sessionCtx, keys[0]) + firstRequestChan := providerQueryManager.FindProvidersAsync(sessionCtx, keys[0], 0) <-firstRequestChan // wait for everything to start. time.Sleep(10 * time.Millisecond) // wait for the incoming providres to stop. cancel() // cancel the context. @@ -365,3 +385,27 @@ func TestCancelFindProvidersAfterCompletion(t *testing.T) { } } } + +func TestLimitedProviders(t *testing.T) { + max := 5 + peers := random.Peers(10) + fpd := &fakeProviderDialer{} + fpn := &fakeProviderDiscovery{ + peersFound: peers, + delay: 1 * time.Millisecond, + } + ctx := context.Background() + providerQueryManager := mustNotErr(New(ctx, fpd, fpn, WithMaxProviders(max))) + providerQueryManager.Startup() + providerQueryManager.setFindProviderTimeout(100 * time.Millisecond) + keys := random.Cids(1) + + providersChan := providerQueryManager.FindProvidersAsync(ctx, keys[0], 0) + total := 0 + for range providersChan { + total++ + } + if total != max { + t.Fatal("returned more providers than requested") + } +}