diff --git a/bitswap/benchmarks_test.go b/bitswap/benchmarks_test.go index 8d51b18971..4af3c6cae2 100644 --- a/bitswap/benchmarks_test.go +++ b/bitswap/benchmarks_test.go @@ -12,16 +12,16 @@ import ( "testing" "time" - blocks "github.com/ipfs/go-block-format" - "github.com/ipfs/go-test/random" - protocol "github.com/libp2p/go-libp2p/core/protocol" - "github.com/ipfs/boxo/bitswap" bsnet "github.com/ipfs/boxo/bitswap/network" testinstance "github.com/ipfs/boxo/bitswap/testinstance" tn "github.com/ipfs/boxo/bitswap/testnet" + mockrouting "github.com/ipfs/boxo/routing/mock" + blocks "github.com/ipfs/go-block-format" cid "github.com/ipfs/go-cid" delay "github.com/ipfs/go-ipfs-delay" + "github.com/ipfs/go-test/random" + protocol "github.com/libp2p/go-libp2p/core/protocol" ) type fetchFunc func(b *testing.B, bs *bitswap.Bitswap, ks []cid.Cid) @@ -134,6 +134,7 @@ func BenchmarkFetchFromOldBitswap(b *testing.B) { benchmarkLog = nil fixedDelay := delay.Fixed(10 * time.Millisecond) bstoreLatency := time.Duration(0) + router := mockrouting.NewServer() for _, bch := range mixedBenches { b.Run(bch.name, func(b *testing.B) { @@ -148,10 +149,10 @@ func BenchmarkFetchFromOldBitswap(b *testing.B) { oldProtocol := []protocol.ID{bsnet.ProtocolBitswapOneOne} oldNetOpts := []bsnet.NetOpt{bsnet.SupportedProtocols(oldProtocol)} oldBsOpts := []bitswap.Option{bitswap.SetSendDontHaves(false)} - oldNodeGenerator := testinstance.NewTestInstanceGenerator(net, oldNetOpts, oldBsOpts) + oldNodeGenerator := testinstance.NewTestInstanceGenerator(net, router, oldNetOpts, oldBsOpts) // Regular new Bitswap node - newNodeGenerator := testinstance.NewTestInstanceGenerator(net, nil, nil) + newNodeGenerator := testinstance.NewTestInstanceGenerator(net, router, nil, nil) var instances []testinstance.Instance // Create new nodes (fetchers + seeds) @@ -295,7 +296,8 @@ func BenchmarkDatacenterMultiLeechMultiSeed(b *testing.B) { for i := 0; i < b.N; i++ { net := tn.RateLimitedVirtualNetwork(d, rateLimitGenerator) - ig := testinstance.NewTestInstanceGenerator(net, nil, nil) + router := mockrouting.NewServer() + ig := testinstance.NewTestInstanceGenerator(net, router, nil, nil) defer ig.Close() instances := ig.Instances(numnodes) @@ -312,8 +314,8 @@ func BenchmarkDatacenterMultiLeechMultiSeed(b *testing.B) { func subtestDistributeAndFetch(b *testing.B, numnodes, numblks int, d delay.D, bstoreLatency time.Duration, df distFunc, ff fetchFunc) { for i := 0; i < b.N; i++ { net := tn.VirtualNetwork(d) - - ig := testinstance.NewTestInstanceGenerator(net, nil, nil) + router := mockrouting.NewServer() + ig := testinstance.NewTestInstanceGenerator(net, router, nil, nil) instances := ig.Instances(numnodes) rootBlock := random.BlocksOfSize(1, rootBlockSize) @@ -327,8 +329,8 @@ func subtestDistributeAndFetch(b *testing.B, numnodes, numblks int, d delay.D, b func subtestDistributeAndFetchRateLimited(b *testing.B, numnodes, numblks int, d delay.D, rateLimitGenerator tn.RateLimitGenerator, blockSize int64, bstoreLatency time.Duration, df distFunc, ff fetchFunc) { for i := 0; i < b.N; i++ { net := tn.RateLimitedVirtualNetwork(d, rateLimitGenerator) - - ig := testinstance.NewTestInstanceGenerator(net, nil, nil) + router := mockrouting.NewServer() + ig := testinstance.NewTestInstanceGenerator(net, router, nil, nil) defer ig.Close() instances := ig.Instances(numnodes) diff --git a/bitswap/bitswap_test.go b/bitswap/bitswap_test.go index 08f7513dfa..c1983eb645 100644 --- a/bitswap/bitswap_test.go +++ b/bitswap/bitswap_test.go @@ -15,6 +15,7 @@ import ( "github.com/ipfs/boxo/bitswap/server" testinstance "github.com/ipfs/boxo/bitswap/testinstance" tn "github.com/ipfs/boxo/bitswap/testnet" + mockrouting "github.com/ipfs/boxo/routing/mock" blocks "github.com/ipfs/go-block-format" cid "github.com/ipfs/go-cid" detectrace "github.com/ipfs/go-detect-race" @@ -51,7 +52,8 @@ const kNetworkDelay = 0 * time.Millisecond func TestClose(t *testing.T) { vnet := tn.VirtualNetwork(delay.Fixed(kNetworkDelay)) - ig := testinstance.NewTestInstanceGenerator(vnet, nil, nil) + router := mockrouting.NewServer() + ig := testinstance.NewTestInstanceGenerator(vnet, router, nil, nil) defer ig.Close() block := random.BlocksOfSize(1, blockSize)[0] bitswap := ig.Next() @@ -65,12 +67,13 @@ func TestClose(t *testing.T) { func TestProviderForKeyButNetworkCannotFind(t *testing.T) { // TODO revisit this net := tn.VirtualNetwork(delay.Fixed(kNetworkDelay)) - ig := testinstance.NewTestInstanceGenerator(net, nil, nil) + router := mockrouting.NewServer() + ig := testinstance.NewTestInstanceGenerator(net, router, nil, nil) defer ig.Close() block := blocks.NewBlock([]byte("block")) pinfo := p2ptestutil.RandTestBogusIdentityOrFatal(t) - err := ig.Routing().Client(pinfo).Provide(context.Background(), block.Cid(), true) // but not on network + err := router.Client(pinfo).Provide(context.Background(), block.Cid(), true) // but not on network if err != nil { t.Fatal(err) } @@ -90,7 +93,8 @@ func TestProviderForKeyButNetworkCannotFind(t *testing.T) { // TODO revisit this func TestGetBlockFromPeerAfterPeerAnnounces(t *testing.T) { net := tn.VirtualNetwork(delay.Fixed(kNetworkDelay)) block := blocks.NewBlock([]byte("block")) - ig := testinstance.NewTestInstanceGenerator(net, nil, nil) + router := mockrouting.NewServer() + ig := testinstance.NewTestInstanceGenerator(net, router, nil, nil) defer ig.Close() peers := ig.Instances(2) @@ -119,7 +123,8 @@ func TestDoesNotProvideWhenConfiguredNotTo(t *testing.T) { net := tn.VirtualNetwork(delay.Fixed(kNetworkDelay)) block := blocks.NewBlock([]byte("block")) bsOpts := []bitswap.Option{bitswap.ProviderSearchDelay(50 * time.Millisecond)} - ig := testinstance.NewTestInstanceGenerator(net, nil, bsOpts) + router := mockrouting.NewServer() + ig := testinstance.NewTestInstanceGenerator(net, router, nil, bsOpts) defer ig.Close() hasBlock := ig.Next() @@ -153,7 +158,8 @@ func TestUnwantedBlockNotAdded(t *testing.T) { bsMessage := bsmsg.New(true) bsMessage.AddBlock(block) - ig := testinstance.NewTestInstanceGenerator(net, nil, nil) + router := mockrouting.NewServer() + ig := testinstance.NewTestInstanceGenerator(net, router, nil, nil) defer ig.Close() peers := ig.Instances(2) @@ -168,7 +174,7 @@ func TestUnwantedBlockNotAdded(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), time.Second) defer cancel() - doesNotWantBlock.Exchange.ReceiveMessage(ctx, hasBlock.Peer, bsMessage) + doesNotWantBlock.Exchange.ReceiveMessage(ctx, hasBlock.Identity.ID(), bsMessage) blockInStore, err := doesNotWantBlock.Blockstore().Has(ctx, block.Cid()) if err != nil || blockInStore { @@ -187,7 +193,8 @@ func TestPendingBlockAdded(t *testing.T) { net := tn.VirtualNetwork(delay.Fixed(kNetworkDelay)) sessionBroadcastWantCapacity := 4 - ig := testinstance.NewTestInstanceGenerator(net, nil, nil) + router := mockrouting.NewServer() + ig := testinstance.NewTestInstanceGenerator(net, router, nil, nil) defer ig.Close() instance := ig.Instances(1)[0] @@ -276,7 +283,8 @@ func PerformDistributionTest(t *testing.T, numInstances, numBlocks int) { t.SkipNow() } net := tn.VirtualNetwork(delay.Fixed(kNetworkDelay)) - ig := testinstance.NewTestInstanceGenerator(net, nil, []bitswap.Option{ + router := mockrouting.NewServer() + ig := testinstance.NewTestInstanceGenerator(net, router, nil, []bitswap.Option{ bitswap.TaskWorkerCount(5), bitswap.EngineTaskWorkerCount(5), bitswap.MaxOutstandingBytesPerPeer(1 << 20), @@ -332,15 +340,16 @@ func TestSendToWantingPeer(t *testing.T) { } net := tn.VirtualNetwork(delay.Fixed(kNetworkDelay)) - ig := testinstance.NewTestInstanceGenerator(net, nil, nil) + router := mockrouting.NewServer() + ig := testinstance.NewTestInstanceGenerator(net, router, nil, nil) defer ig.Close() peers := ig.Instances(2) peerA := peers[0] peerB := peers[1] - t.Logf("Session %v\n", peerA.Peer) - t.Logf("Session %v\n", peerB.Peer) + t.Logf("Session %v\n", peerA.Identity.ID()) + t.Logf("Session %v\n", peerB.Identity.ID()) waitTime := time.Second * 5 @@ -369,7 +378,8 @@ func TestSendToWantingPeer(t *testing.T) { func TestEmptyKey(t *testing.T) { net := tn.VirtualNetwork(delay.Fixed(kNetworkDelay)) - ig := testinstance.NewTestInstanceGenerator(net, nil, nil) + router := mockrouting.NewServer() + ig := testinstance.NewTestInstanceGenerator(net, router, nil, nil) defer ig.Close() bs := ig.Instances(1)[0].Exchange @@ -402,7 +412,8 @@ func assertStat(t *testing.T, st *bitswap.Stat, sblks, rblks, sdata, rdata uint6 func TestBasicBitswap(t *testing.T) { net := tn.VirtualNetwork(delay.Fixed(kNetworkDelay)) - ig := testinstance.NewTestInstanceGenerator(net, nil, nil) + router := mockrouting.NewServer() + ig := testinstance.NewTestInstanceGenerator(net, router, nil, nil) defer ig.Close() t.Log("Test a one node trying to get one block from another") @@ -426,7 +437,7 @@ func TestBasicBitswap(t *testing.T) { // When second peer receives block, it should send out a cancel, so third // peer should no longer keep second peer's want if err = tu.WaitFor(ctx, func() error { - if len(instances[2].Exchange.WantlistForPeer(instances[1].Peer)) != 0 { + if len(instances[2].Exchange.WantlistForPeer(instances[1].Identity.ID())) != 0 { return errors.New("should have no items in other peers wantlist") } if len(instances[1].Exchange.GetWantlist()) != 0 { @@ -473,7 +484,8 @@ func TestBasicBitswap(t *testing.T) { func TestDoubleGet(t *testing.T) { net := tn.VirtualNetwork(delay.Fixed(kNetworkDelay)) - ig := testinstance.NewTestInstanceGenerator(net, nil, nil) + router := mockrouting.NewServer() + ig := testinstance.NewTestInstanceGenerator(net, router, nil, nil) defer ig.Close() t.Log("Test a one node trying to get one block from another") @@ -516,7 +528,7 @@ func TestDoubleGet(t *testing.T) { } t.Log(blk) case <-time.After(time.Second * 5): - p1wl := instances[0].Exchange.WantlistForPeer(instances[1].Peer) + p1wl := instances[0].Exchange.WantlistForPeer(instances[1].Identity.ID()) if len(p1wl) != 1 { t.Logf("wantlist view didnt have 1 item (had %d)", len(p1wl)) } else if !p1wl[0].Equals(blocks[0].Cid()) { @@ -537,7 +549,8 @@ func TestDoubleGet(t *testing.T) { func TestWantlistCleanup(t *testing.T) { net := tn.VirtualNetwork(delay.Fixed(kNetworkDelay)) - ig := testinstance.NewTestInstanceGenerator(net, nil, nil) + router := mockrouting.NewServer() + ig := testinstance.NewTestInstanceGenerator(net, router, nil, nil) defer ig.Close() instances := ig.Instances(2) @@ -658,7 +671,8 @@ func newReceipt(sent, recv, exchanged uint64) *server.Receipt { func TestBitswapLedgerOneWay(t *testing.T) { net := tn.VirtualNetwork(delay.Fixed(kNetworkDelay)) - ig := testinstance.NewTestInstanceGenerator(net, nil, nil) + router := mockrouting.NewServer() + ig := testinstance.NewTestInstanceGenerator(net, router, nil, nil) defer ig.Close() t.Log("Test ledgers match when one peer sends block to another") @@ -674,8 +688,8 @@ func TestBitswapLedgerOneWay(t *testing.T) { t.Fatal(err) } - ra := instances[0].Exchange.LedgerForPeer(instances[1].Peer) - rb := instances[1].Exchange.LedgerForPeer(instances[0].Peer) + ra := instances[0].Exchange.LedgerForPeer(instances[1].Identity.ID()) + rb := instances[1].Exchange.LedgerForPeer(instances[0].Identity.ID()) // compare peer ledger receipts err = assertLedgerMatch(ra, rb) @@ -706,7 +720,8 @@ func TestBitswapLedgerOneWay(t *testing.T) { func TestBitswapLedgerTwoWay(t *testing.T) { net := tn.VirtualNetwork(delay.Fixed(kNetworkDelay)) - ig := testinstance.NewTestInstanceGenerator(net, nil, nil) + router := mockrouting.NewServer() + ig := testinstance.NewTestInstanceGenerator(net, router, nil, nil) defer ig.Close() t.Log("Test ledgers match when two peers send one block to each other") @@ -730,8 +745,8 @@ func TestBitswapLedgerTwoWay(t *testing.T) { t.Fatal(err) } - ra := instances[0].Exchange.LedgerForPeer(instances[1].Peer) - rb := instances[1].Exchange.LedgerForPeer(instances[0].Peer) + ra := instances[0].Exchange.LedgerForPeer(instances[1].Identity.ID()) + rb := instances[1].Exchange.LedgerForPeer(instances[0].Identity.ID()) // compare peer ledger receipts err = assertLedgerMatch(ra, rb) @@ -794,8 +809,9 @@ func (tsl *testingScoreLedger) Stop() { func TestWithScoreLedger(t *testing.T) { tsl := newTestingScoreLedger() net := tn.VirtualNetwork(delay.Fixed(kNetworkDelay)) + router := mockrouting.NewServer() bsOpts := []bitswap.Option{bitswap.WithScoreLedger(tsl)} - ig := testinstance.NewTestInstanceGenerator(net, nil, bsOpts) + ig := testinstance.NewTestInstanceGenerator(net, router, nil, bsOpts) defer ig.Close() i := ig.Next() defer i.Exchange.Close() diff --git a/bitswap/client/bitswap_with_sessions_test.go b/bitswap/client/bitswap_with_sessions_test.go index f62ac1c241..258f417d96 100644 --- a/bitswap/client/bitswap_with_sessions_test.go +++ b/bitswap/client/bitswap_with_sessions_test.go @@ -12,6 +12,7 @@ import ( "github.com/ipfs/boxo/bitswap/client/traceability" testinstance "github.com/ipfs/boxo/bitswap/testinstance" tn "github.com/ipfs/boxo/bitswap/testnet" + mockrouting "github.com/ipfs/boxo/routing/mock" blocks "github.com/ipfs/go-block-format" cid "github.com/ipfs/go-cid" delay "github.com/ipfs/go-ipfs-delay" @@ -49,7 +50,8 @@ func TestBasicSessions(t *testing.T) { defer cancel() vnet := getVirtualNetwork() - ig := testinstance.NewTestInstanceGenerator(vnet, nil, nil) + router := mockrouting.NewServer() + ig := testinstance.NewTestInstanceGenerator(vnet, router, nil, nil) defer ig.Close() block := random.BlocksOfSize(1, blockSize)[0] @@ -81,7 +83,7 @@ func TestBasicSessions(t *testing.T) { t.Fatal("did not get tracable block") } - if traceBlock.From != b.Peer { + if traceBlock.From != b.Identity.ID() { t.Fatal("should have received block from peer B, did not") } } @@ -115,7 +117,8 @@ func TestSessionBetweenPeers(t *testing.T) { defer cancel() vnet := tn.VirtualNetwork(delay.Fixed(time.Millisecond)) - ig := testinstance.NewTestInstanceGenerator(vnet, nil, []bitswap.Option{bitswap.SetSimulateDontHavesOnTimeout(false)}) + router := mockrouting.NewServer() + ig := testinstance.NewTestInstanceGenerator(vnet, router, nil, []bitswap.Option{bitswap.SetSimulateDontHavesOnTimeout(false)}) defer ig.Close() inst := ig.Instances(10) @@ -150,7 +153,7 @@ func TestSessionBetweenPeers(t *testing.T) { for b := range ch { got = append(got, b) } - if err := assertBlockListsFrom(inst[0].Peer, got, blks[i*10:(i+1)*10]); err != nil { + if err := assertBlockListsFrom(inst[0].Identity.ID(), got, blks[i*10:(i+1)*10]); err != nil { t.Fatal(err) } } @@ -174,7 +177,8 @@ func TestSessionSplitFetch(t *testing.T) { defer cancel() vnet := getVirtualNetwork() - ig := testinstance.NewTestInstanceGenerator(vnet, nil, nil) + router := mockrouting.NewServer() + ig := testinstance.NewTestInstanceGenerator(vnet, router, nil, nil) defer ig.Close() inst := ig.Instances(11) @@ -206,7 +210,7 @@ func TestSessionSplitFetch(t *testing.T) { for b := range ch { got = append(got, b) } - if err := assertBlockListsFrom(inst[i].Peer, got, blks[i*10:(i+1)*10]); err != nil { + if err := assertBlockListsFrom(inst[i].Identity.ID(), got, blks[i*10:(i+1)*10]); err != nil { t.Fatal(err) } } @@ -217,7 +221,8 @@ func TestFetchNotConnected(t *testing.T) { defer cancel() vnet := getVirtualNetwork() - ig := testinstance.NewTestInstanceGenerator(vnet, nil, []bitswap.Option{bitswap.ProviderSearchDelay(10 * time.Millisecond)}) + router := mockrouting.NewServer() + ig := testinstance.NewTestInstanceGenerator(vnet, router, nil, []bitswap.Option{bitswap.ProviderSearchDelay(10 * time.Millisecond)}) defer ig.Close() other := ig.Next() @@ -248,7 +253,7 @@ func TestFetchNotConnected(t *testing.T) { for b := range ch { got = append(got, b) } - if err := assertBlockListsFrom(other.Peer, got, blks); err != nil { + if err := assertBlockListsFrom(other.Identity.ID(), got, blks); err != nil { t.Fatal(err) } } @@ -258,7 +263,8 @@ func TestFetchAfterDisconnect(t *testing.T) { defer cancel() vnet := getVirtualNetwork() - ig := testinstance.NewTestInstanceGenerator(vnet, nil, []bitswap.Option{ + router := mockrouting.NewServer() + ig := testinstance.NewTestInstanceGenerator(vnet, router, nil, []bitswap.Option{ bitswap.ProviderSearchDelay(10 * time.Millisecond), bitswap.RebroadcastDelay(delay.Fixed(15 * time.Millisecond)), }) @@ -296,12 +302,12 @@ func TestFetchAfterDisconnect(t *testing.T) { got = append(got, b) } - if err := assertBlockListsFrom(peerA.Peer, got, blks[:5]); err != nil { + if err := assertBlockListsFrom(peerA.Identity.ID(), got, blks[:5]); err != nil { t.Fatal(err) } // Break connection - err = peerA.Adapter.DisconnectFrom(ctx, peerB.Peer) + err = peerA.Adapter.DisconnectFrom(ctx, peerB.Identity.ID()) if err != nil { t.Fatal(err) } @@ -325,7 +331,7 @@ func TestFetchAfterDisconnect(t *testing.T) { } } - if err := assertBlockListsFrom(peerA.Peer, got, blks); err != nil { + if err := assertBlockListsFrom(peerA.Identity.ID(), got, blks); err != nil { t.Fatal(err) } } @@ -335,7 +341,8 @@ func TestInterestCacheOverflow(t *testing.T) { defer cancel() vnet := getVirtualNetwork() - ig := testinstance.NewTestInstanceGenerator(vnet, nil, nil) + router := mockrouting.NewServer() + ig := testinstance.NewTestInstanceGenerator(vnet, router, nil, nil) defer ig.Close() blks := random.BlocksOfSize(2049, blockSize) @@ -384,7 +391,8 @@ func TestPutAfterSessionCacheEvict(t *testing.T) { defer cancel() vnet := getVirtualNetwork() - ig := testinstance.NewTestInstanceGenerator(vnet, nil, nil) + router := mockrouting.NewServer() + ig := testinstance.NewTestInstanceGenerator(vnet, router, nil, nil) defer ig.Close() blks := random.BlocksOfSize(2500, blockSize) @@ -421,7 +429,8 @@ func TestMultipleSessions(t *testing.T) { defer cancel() vnet := getVirtualNetwork() - ig := testinstance.NewTestInstanceGenerator(vnet, nil, nil) + router := mockrouting.NewServer() + ig := testinstance.NewTestInstanceGenerator(vnet, router, nil, nil) defer ig.Close() blk := random.BlocksOfSize(1, blockSize)[0] @@ -461,7 +470,8 @@ func TestWantlistClearsOnCancel(t *testing.T) { defer cancel() vnet := getVirtualNetwork() - ig := testinstance.NewTestInstanceGenerator(vnet, nil, nil) + router := mockrouting.NewServer() + ig := testinstance.NewTestInstanceGenerator(vnet, router, nil, nil) defer ig.Close() blks := random.BlocksOfSize(10, blockSize) diff --git a/bitswap/client/client.go b/bitswap/client/client.go index 5ae2d029dd..13d8a006a7 100644 --- a/bitswap/client/client.go +++ b/bitswap/client/client.go @@ -13,7 +13,6 @@ import ( bsmq "github.com/ipfs/boxo/bitswap/client/internal/messagequeue" "github.com/ipfs/boxo/bitswap/client/internal/notifications" bspm "github.com/ipfs/boxo/bitswap/client/internal/peermanager" - "github.com/ipfs/boxo/bitswap/client/internal/session" bssession "github.com/ipfs/boxo/bitswap/client/internal/session" bssim "github.com/ipfs/boxo/bitswap/client/internal/sessioninterestmanager" bssm "github.com/ipfs/boxo/bitswap/client/internal/sessionmanager" @@ -203,7 +202,7 @@ func New(parent context.Context, network bsnet.BitSwapNetwork, providerFinder Pr // into session.ProviderFinder when passing it, it will become // not nil. Related: // https://groups.google.com/g/golang-nuts/c/wnH302gBa4I?pli=1 - var pqm session.ProviderFinder + var pqm bssession.ProviderFinder if bs.pqm != nil { pqm = bs.pqm } diff --git a/bitswap/testinstance/testinstance.go b/bitswap/testinstance/testinstance.go index 6f9ee9305d..fae4258719 100644 --- a/bitswap/testinstance/testinstance.go +++ b/bitswap/testinstance/testinstance.go @@ -21,7 +21,7 @@ import ( // NewTestInstanceGenerator generates a new InstanceGenerator for the given // testnet -func NewTestInstanceGenerator(net tn.Network, netOptions []bsnet.NetOpt, bsOptions []bitswap.Option) InstanceGenerator { +func NewTestInstanceGenerator(net tn.Network, routing mockrouting.Server, netOptions []bsnet.NetOpt, bsOptions []bitswap.Option) InstanceGenerator { ctx, cancel := context.WithCancel(context.Background()) return InstanceGenerator{ net: net, @@ -30,7 +30,7 @@ func NewTestInstanceGenerator(net tn.Network, netOptions []bsnet.NetOpt, bsOptio cancel: cancel, bsOptions: bsOptions, netOptions: netOptions, - routing: mockrouting.NewServer(), + routing: routing, } } @@ -78,7 +78,7 @@ func ConnectInstances(instances []Instance) { for i, inst := range instances { for j := i + 1; j < len(instances); j++ { oinst := instances[j] - err := inst.Adapter.Connect(context.Background(), peer.AddrInfo{ID: oinst.Peer}) + err := inst.Adapter.Connect(context.Background(), peer.AddrInfo{ID: oinst.Identity.ID()}) if err != nil { panic(err.Error()) } @@ -86,14 +86,9 @@ func ConnectInstances(instances []Instance) { } } -// Routing returns the mock routing server -func (g *InstanceGenerator) Routing() mockrouting.Server { - return g.routing -} - // Instance is a test instance of bitswap + dependencies for integration testing type Instance struct { - Peer peer.ID + Identity tnet.Identity Exchange *bitswap.Bitswap blockstore blockstore.Blockstore Adapter bsnet.BitSwapNetwork @@ -131,10 +126,9 @@ func NewInstance(ctx context.Context, net tn.Network, router routing.Routing, p } bs := bitswap.New(ctx, adapter, router, bstore, bsOptions...) - return Instance{ Adapter: adapter, - Peer: p.ID(), + Identity: p, Exchange: bs, Routing: router, blockstore: bstore, diff --git a/blockservice/blockservice.go b/blockservice/blockservice.go index e24e5b44e1..16c09ab2b0 100644 --- a/blockservice/blockservice.go +++ b/blockservice/blockservice.go @@ -195,7 +195,7 @@ func (s *blockService) AddBlock(ctx context.Context, o blocks.Block) error { } } if s.provider != nil { - if err := s.provider.Provide(o.Cid()); err != nil { + if err := s.provider.Provide(ctx, o.Cid(), true); err != nil { logger.Errorf("Provide: %s", err.Error()) } } @@ -247,7 +247,7 @@ func (s *blockService) AddBlocks(ctx context.Context, bs []blocks.Block) error { } if s.provider != nil { for _, o := range toput { - if err := s.provider.Provide(o.Cid()); err != nil { + if err := s.provider.Provide(ctx, o.Cid(), true); err != nil { logger.Errorf("Provide: %s", err.Error()) } } @@ -310,7 +310,7 @@ func getBlock(ctx context.Context, c cid.Cid, bs BlockService, fetchFactory func } } if provider != nil { - err = provider.Provide(blk.Cid()) + err = provider.Provide(ctx, blk.Cid(), true) if err != nil { return nil, err } @@ -420,7 +420,7 @@ func getBlocks(ctx context.Context, ks []cid.Cid, blockservice BlockService, fet } if provider != nil { - err = provider.Provide(b.Cid()) + err = provider.Provide(ctx, b.Cid(), true) if err != nil { logger.Errorf("could not tell the provider about new blocks: %s", err) return diff --git a/blockservice/blockservice_test.go b/blockservice/blockservice_test.go index 9629a8074a..14fed0a17f 100644 --- a/blockservice/blockservice_test.go +++ b/blockservice/blockservice_test.go @@ -294,7 +294,7 @@ type wrappedBlockservice struct { type mockProvider []cid.Cid -func (p *mockProvider) Provide(c cid.Cid) error { +func (p *mockProvider) Provide(ctx context.Context, c cid.Cid, announce bool) error { *p = append(*p, c) return nil } diff --git a/blockservice/providing_blockstore.go b/blockservice/providing_blockstore.go index 7435f8ae29..2ca91a9087 100644 --- a/blockservice/providing_blockstore.go +++ b/blockservice/providing_blockstore.go @@ -20,7 +20,7 @@ func (pbs providingBlockstore) Put(ctx context.Context, b blocks.Block) error { return err } - return pbs.provider.Provide(b.Cid()) + return pbs.provider.Provide(ctx, b.Cid(), true) } func (pbs providingBlockstore) PutMany(ctx context.Context, b []blocks.Block) error { @@ -29,7 +29,7 @@ func (pbs providingBlockstore) PutMany(ctx context.Context, b []blocks.Block) er } for _, b := range b { - if err := pbs.provider.Provide(b.Cid()); err != nil { + if err := pbs.provider.Provide(ctx, b.Cid(), true); err != nil { return err // this can only error if the whole provider is done for } } diff --git a/blockservice/test/mock.go b/blockservice/test/mock.go index e32b10b99d..012100b555 100644 --- a/blockservice/test/mock.go +++ b/blockservice/test/mock.go @@ -10,14 +10,15 @@ import ( // Mocks returns |n| connected mock Blockservices func Mocks(n int, opts ...blockservice.Option) []blockservice.BlockService { - net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(0)) - sg := testinstance.NewTestInstanceGenerator(net, nil, nil) - + net := tn.VirtualNetwork(delay.Fixed(0)) + routing := mockrouting.NewServer() + sg := testinstance.NewTestInstanceGenerator(net, routing, nil, nil) instances := sg.Instances(n) var servs []blockservice.BlockService for _, i := range instances { - servs = append(servs, blockservice.New(i.Blockstore(), i.Exchange, opts...)) + servs = append(servs, blockservice.New(i.Blockstore(), + i.Exchange, append(opts, blockservice.WithProvider(i.Routing))...)) } return servs } diff --git a/examples/bitswap-transfer/main.go b/examples/bitswap-transfer/main.go index 921dca3fab..fc2d5ded3b 100644 --- a/examples/bitswap-transfer/main.go +++ b/examples/bitswap-transfer/main.go @@ -32,7 +32,6 @@ import ( unixfile "github.com/ipfs/boxo/ipld/unixfs/file" "github.com/ipfs/boxo/ipld/unixfs/importer/balanced" uih "github.com/ipfs/boxo/ipld/unixfs/importer/helpers" - routinghelpers "github.com/libp2p/go-libp2p-routing-helpers" bsclient "github.com/ipfs/boxo/bitswap/client" bsnet "github.com/ipfs/boxo/bitswap/network" @@ -178,15 +177,15 @@ func startDataServer(ctx context.Context, h host.Host) (cid.Cid, *bsserver.Serve // Start listening on the Bitswap protocol // For this example we're not leveraging any content routing (DHT, IPNI, delegated routing requests, etc.) as we know the peer we are fetching from - n := bsnet.NewFromIpfsHost(h, routinghelpers.Null{}) + n := bsnet.NewFromIpfsHost(h) bswap := bsserver.New(ctx, n, bs) n.Start(bswap) return nd.Cid(), bswap, nil } func runClient(ctx context.Context, h host.Host, c cid.Cid, targetPeer string) ([]byte, error) { - n := bsnet.NewFromIpfsHost(h, routinghelpers.Null{}) - bswap := bsclient.New(ctx, n, blockstore.NewBlockstore(datastore.NewNullDatastore())) + n := bsnet.NewFromIpfsHost(h) + bswap := bsclient.New(ctx, n, nil, blockstore.NewBlockstore(datastore.NewNullDatastore())) n.Start(bswap) defer bswap.Close() diff --git a/fetcher/helpers/block_visitor_test.go b/fetcher/helpers/block_visitor_test.go index 57d3e11ada..9d7db990a1 100644 --- a/fetcher/helpers/block_visitor_test.go +++ b/fetcher/helpers/block_visitor_test.go @@ -44,8 +44,9 @@ func TestFetchGraphToBlocks(t *testing.T) { }) })) - net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(0*time.Millisecond)) - ig := testinstance.NewTestInstanceGenerator(net, nil, nil) + routing := mockrouting.NewServer() + net := tn.VirtualNetwork(delay.Fixed(0 * time.Millisecond)) + ig := testinstance.NewTestInstanceGenerator(net, routing, nil, nil) defer ig.Close() peers := ig.Instances(2) @@ -94,8 +95,9 @@ func TestFetchGraphToUniqueBlocks(t *testing.T) { }) })) - net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(0*time.Millisecond)) - ig := testinstance.NewTestInstanceGenerator(net, nil, nil) + routing := mockrouting.NewServer() + net := tn.VirtualNetwork(delay.Fixed(0 * time.Millisecond)) + ig := testinstance.NewTestInstanceGenerator(net, routing, nil, nil) defer ig.Close() peers := ig.Instances(2) diff --git a/fetcher/impl/blockservice/fetcher_test.go b/fetcher/impl/blockservice/fetcher_test.go index 5a0b071f48..95152e5881 100644 --- a/fetcher/impl/blockservice/fetcher_test.go +++ b/fetcher/impl/blockservice/fetcher_test.go @@ -38,8 +38,9 @@ func TestFetchIPLDPrimeNode(t *testing.T) { }) })) - net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(0*time.Millisecond)) - ig := testinstance.NewTestInstanceGenerator(net, nil, nil) + routing := mockrouting.NewServer() + net := tn.VirtualNetwork(delay.Fixed(0 * time.Millisecond)) + ig := testinstance.NewTestInstanceGenerator(net, routing, nil, nil) defer ig.Close() peers := ig.Instances(2) @@ -87,8 +88,9 @@ func TestFetchIPLDGraph(t *testing.T) { }) })) - net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(0*time.Millisecond)) - ig := testinstance.NewTestInstanceGenerator(net, nil, nil) + routing := mockrouting.NewServer() + net := tn.VirtualNetwork(delay.Fixed(0 * time.Millisecond)) + ig := testinstance.NewTestInstanceGenerator(net, routing, nil, nil) defer ig.Close() peers := ig.Instances(2) @@ -143,8 +145,9 @@ func TestFetchIPLDPath(t *testing.T) { }) })) - net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(0*time.Millisecond)) - ig := testinstance.NewTestInstanceGenerator(net, nil, nil) + routing := mockrouting.NewServer() + net := tn.VirtualNetwork(delay.Fixed(0 * time.Millisecond)) + ig := testinstance.NewTestInstanceGenerator(net, routing, nil, nil) defer ig.Close() peers := ig.Instances(2) @@ -206,9 +209,9 @@ func TestHelpers(t *testing.T) { na.AssembleEntry("nonlink").AssignString("zoo") }) })) - - net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(0*time.Millisecond)) - ig := testinstance.NewTestInstanceGenerator(net, nil, nil) + routing := mockrouting.NewServer() + net := tn.VirtualNetwork(delay.Fixed(0 * time.Millisecond)) + ig := testinstance.NewTestInstanceGenerator(net, routing, nil, nil) defer ig.Close() peers := ig.Instances(2) @@ -321,8 +324,9 @@ func TestNodeReification(t *testing.T) { na.AssembleEntry("link4").AssignLink(link4) })) - net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(0*time.Millisecond)) - ig := testinstance.NewTestInstanceGenerator(net, nil, nil) + routing := mockrouting.NewServer() + net := tn.VirtualNetwork(delay.Fixed(0 * time.Millisecond)) + ig := testinstance.NewTestInstanceGenerator(net, routing, nil, nil) defer ig.Close() peers := ig.Instances(2) diff --git a/gateway/backend_blocks.go b/gateway/backend_blocks.go index 3d4f98bf0e..cbaaf90781 100644 --- a/gateway/backend_blocks.go +++ b/gateway/backend_blocks.go @@ -109,7 +109,7 @@ func NewRemoteBlocksBackend(gatewayURL []string, httpClient *http.Client, opts . return nil, err } - blockService := blockservice.New(blockStore, offline.Exchange(blockStore)) + blockService := blockservice.New(blockStore, offline.Exchange(blockStore), nil) return NewBlocksBackend(blockService, append(opts, WithValueStore(valueStore))...) } diff --git a/provider/noop.go b/provider/noop.go index 5367ccb306..50c3e3502f 100644 --- a/provider/noop.go +++ b/provider/noop.go @@ -19,7 +19,7 @@ func (op *noopProvider) Close() error { return nil } -func (op *noopProvider) Provide(cid.Cid) error { +func (op *noopProvider) Provide(context.Context, cid.Cid, bool) error { return nil } diff --git a/provider/provider.go b/provider/provider.go index a20a805cb7..4197f3dae3 100644 --- a/provider/provider.go +++ b/provider/provider.go @@ -18,7 +18,7 @@ var logR = logging.Logger("reprovider.simple") // Provider announces blocks to the network type Provider interface { // Provide takes a cid and makes an attempt to announce it to the network - Provide(cid.Cid) error + Provide(context.Context, cid.Cid, bool) error } // Reprovider reannounces blocks to the network diff --git a/provider/reprovider.go b/provider/reprovider.go index 219bacc753..048a2067dd 100644 --- a/provider/reprovider.go +++ b/provider/reprovider.go @@ -455,7 +455,7 @@ func (s *reprovider) Close() error { return err } -func (s *reprovider) Provide(cid cid.Cid) error { +func (s *reprovider) Provide(ctx context.Context, cid cid.Cid, announce bool) error { return s.q.Enqueue(cid) } diff --git a/provider/reprovider_test.go b/provider/reprovider_test.go index 4ae58148e6..ceb72f97b3 100644 --- a/provider/reprovider_test.go +++ b/provider/reprovider_test.go @@ -198,7 +198,7 @@ func TestOfflineRecordsThenOnlineRepublish(t *testing.T) { sys, err := New(ds) assert.NoError(t, err) - err = sys.Provide(c) + err = sys.Provide(context.Background(), c, true) assert.NoError(t, err) err = sys.Close()