From da1e2d2c7b587e5e3aaeab899904a667aa340422 Mon Sep 17 00:00:00 2001 From: Hector Sanjuan Date: Wed, 20 Nov 2024 17:01:35 +0100 Subject: [PATCH] exchange: create a providing.Exchange which provides on NotifyNewBlocks. --- bitswap/testinstance/testinstance.go | 15 +++--- blockservice/test/mock.go | 2 +- exchange/providing/providing.go | 36 +++++++++++++ exchange/providing/providing_test.go | 62 +++++++++++++++++++++++ fetcher/helpers/block_visitor_test.go | 8 +-- fetcher/impl/blockservice/fetcher_test.go | 20 ++++---- routing/mock/centralized_server.go | 7 ++- 7 files changed, 123 insertions(+), 27 deletions(-) create mode 100644 exchange/providing/providing.go create mode 100644 exchange/providing/providing_test.go diff --git a/bitswap/testinstance/testinstance.go b/bitswap/testinstance/testinstance.go index fae4258719..9786ef4712 100644 --- a/bitswap/testinstance/testinstance.go +++ b/bitswap/testinstance/testinstance.go @@ -9,6 +9,7 @@ import ( tn "github.com/ipfs/boxo/bitswap/testnet" blockstore "github.com/ipfs/boxo/blockstore" mockrouting "github.com/ipfs/boxo/routing/mock" + "github.com/ipfs/go-datastore" ds "github.com/ipfs/go-datastore" delayed "github.com/ipfs/go-datastore/delayed" ds_sync "github.com/ipfs/go-datastore/sync" @@ -89,18 +90,14 @@ func ConnectInstances(instances []Instance) { // Instance is a test instance of bitswap + dependencies for integration testing type Instance struct { Identity tnet.Identity + Datastore datastore.Batching Exchange *bitswap.Bitswap - blockstore blockstore.Blockstore + Blockstore blockstore.Blockstore Adapter bsnet.BitSwapNetwork Routing routing.Routing blockstoreDelay delay.D } -// Blockstore returns the block store for this test instance -func (i *Instance) Blockstore() blockstore.Blockstore { - return i.blockstore -} - // SetBlockstoreLatency customizes the artificial delay on receiving blocks // from a blockstore test instance. func (i *Instance) SetBlockstoreLatency(t time.Duration) time.Duration { @@ -118,8 +115,9 @@ func NewInstance(ctx context.Context, net tn.Network, router routing.Routing, p adapter := net.Adapter(p, netOptions...) dstore := ds_sync.MutexWrap(delayed.New(ds.NewMapDatastore(), bsdelay)) + ds := ds_sync.MutexWrap(dstore) bstore, err := blockstore.CachedBlockstore(ctx, - blockstore.NewBlockstore(ds_sync.MutexWrap(dstore)), + blockstore.NewBlockstore(ds), blockstore.DefaultCacheOpts()) if err != nil { panic(err.Error()) // FIXME perhaps change signature and return error. @@ -127,11 +125,12 @@ func NewInstance(ctx context.Context, net tn.Network, router routing.Routing, p bs := bitswap.New(ctx, adapter, router, bstore, bsOptions...) return Instance{ + Datastore: ds, Adapter: adapter, Identity: p, Exchange: bs, Routing: router, - blockstore: bstore, + Blockstore: bstore, blockstoreDelay: bsdelay, } } diff --git a/blockservice/test/mock.go b/blockservice/test/mock.go index 012100b555..be30658f8e 100644 --- a/blockservice/test/mock.go +++ b/blockservice/test/mock.go @@ -17,7 +17,7 @@ func Mocks(n int, opts ...blockservice.Option) []blockservice.BlockService { var servs []blockservice.BlockService for _, i := range instances { - servs = append(servs, blockservice.New(i.Blockstore(), + servs = append(servs, blockservice.New(i.Blockstore, i.Exchange, append(opts, blockservice.WithProvider(i.Routing))...)) } return servs diff --git a/exchange/providing/providing.go b/exchange/providing/providing.go new file mode 100644 index 0000000000..5e5d80c18c --- /dev/null +++ b/exchange/providing/providing.go @@ -0,0 +1,36 @@ +// Package providing implements an exchange wrapper which +// does content providing for new blocks. +package providing + +import ( + "context" + + "github.com/ipfs/boxo/exchange" + "github.com/ipfs/boxo/provider" + blocks "github.com/ipfs/go-block-format" +) + +// Exchange is an exchange wrapper that calls ProvideMany for blocks received +// over NotifyNewBlocks. +type Exchange struct { + exchange.Interface + provider provider.Provider +} + +// New creates a new providing Exchange with the given exchange and provider. +func New(base exchange.Interface, provider provider.Provider) *Exchange { + return &Exchange{ + Interface: base, + provider: provider, + } +} + +// NotifyNewBlocks calls provider.ProvideMany. +func (ex *Exchange) NotifyNewBlocks(ctx context.Context, blocks ...blocks.Block) error { + for _, b := range blocks { + if err := ex.provider.Provide(ctx, b.Cid(), true); err != nil { + return err + } + } + return nil +} diff --git a/exchange/providing/providing_test.go b/exchange/providing/providing_test.go new file mode 100644 index 0000000000..298ff56fcb --- /dev/null +++ b/exchange/providing/providing_test.go @@ -0,0 +1,62 @@ +package providing + +import ( + "context" + "testing" + + testinstance "github.com/ipfs/boxo/bitswap/testinstance" + tn "github.com/ipfs/boxo/bitswap/testnet" + "github.com/ipfs/boxo/blockservice" + "github.com/ipfs/boxo/provider" + mockrouting "github.com/ipfs/boxo/routing/mock" + delay "github.com/ipfs/go-ipfs-delay" + "github.com/ipfs/go-test/random" +) + +func TestExchange(t *testing.T) { + ctx := context.Background() + net := tn.VirtualNetwork(delay.Fixed(0)) + routing := mockrouting.NewServer() + sg := testinstance.NewTestInstanceGenerator(net, routing, nil, nil) + i := sg.Next() + provFinder := routing.Client(i.Identity) + prov, err := provider.New(i.Datastore, + provider.Online(provFinder), + ) + if err != nil { + t.Fatal(err) + } + provExchange := New(i.Exchange, prov) + // write-through so that we notify when re-adding block + bs := blockservice.New(i.Blockstore, provExchange, + blockservice.WriteThrough()) + block := random.BlocksOfSize(1, 10)[0] + // put it on the blockstore of the first instance + err = i.Blockstore.Put(ctx, block) + if err != nil { + t.Fatal() + } + + providersChan := provFinder.FindProvidersAsync(ctx, block.Cid(), 1) + _, ok := <-providersChan + if ok { + t.Fatal("there should be no providers yet for block") + } + + // Now add it via BlockService. It should trigger NotifyNewBlocks + // on the exchange and thus they should get announced. + err = bs.AddBlock(ctx, block) + if err != nil { + t.Fatal() + } + // Trigger reproviding, otherwise it's not really provided. + err = prov.Reprovide(ctx) + if err != nil { + t.Fatal(err) + } + providersChan = provFinder.FindProvidersAsync(ctx, block.Cid(), 1) + _, ok = <-providersChan + if !ok { + t.Fatal("there should be one provider for the block") + } +} diff --git a/fetcher/helpers/block_visitor_test.go b/fetcher/helpers/block_visitor_test.go index 9d7db990a1..9ea0eacd98 100644 --- a/fetcher/helpers/block_visitor_test.go +++ b/fetcher/helpers/block_visitor_test.go @@ -54,7 +54,7 @@ func TestFetchGraphToBlocks(t *testing.T) { defer hasBlock.Exchange.Close() blocks := []blocks.Block{block1, block2, block3, block4} - err := hasBlock.Blockstore().PutMany(bg, blocks) + err := hasBlock.Blockstore.PutMany(bg, blocks) require.NoError(t, err) err = hasBlock.Exchange.NotifyNewBlocks(bg, blocks...) require.NoError(t, err) @@ -62,7 +62,7 @@ func TestFetchGraphToBlocks(t *testing.T) { wantsBlock := peers[1] defer wantsBlock.Exchange.Close() - wantsGetter := blockservice.New(wantsBlock.Blockstore(), wantsBlock.Exchange) + wantsGetter := blockservice.New(wantsBlock.Blockstore, wantsBlock.Exchange) fetcherConfig := bsfetcher.NewFetcherConfig(wantsGetter) session := fetcherConfig.NewSession(context.Background()) ctx, cancel := context.WithTimeout(context.Background(), time.Second) @@ -104,7 +104,7 @@ func TestFetchGraphToUniqueBlocks(t *testing.T) { hasBlock := peers[0] defer hasBlock.Exchange.Close() - err := hasBlock.Blockstore().PutMany(bg, []blocks.Block{block1, block2, block3}) + err := hasBlock.Blockstore.PutMany(bg, []blocks.Block{block1, block2, block3}) require.NoError(t, err) err = hasBlock.Exchange.NotifyNewBlocks(bg, block1, block2, block3) @@ -113,7 +113,7 @@ func TestFetchGraphToUniqueBlocks(t *testing.T) { wantsBlock := peers[1] defer wantsBlock.Exchange.Close() - wantsGetter := blockservice.New(wantsBlock.Blockstore(), wantsBlock.Exchange) + wantsGetter := blockservice.New(wantsBlock.Blockstore, wantsBlock.Exchange) fetcherConfig := bsfetcher.NewFetcherConfig(wantsGetter) session := fetcherConfig.NewSession(context.Background()) ctx, cancel := context.WithTimeout(context.Background(), time.Second) diff --git a/fetcher/impl/blockservice/fetcher_test.go b/fetcher/impl/blockservice/fetcher_test.go index 95152e5881..55c1d5c21e 100644 --- a/fetcher/impl/blockservice/fetcher_test.go +++ b/fetcher/impl/blockservice/fetcher_test.go @@ -47,7 +47,7 @@ func TestFetchIPLDPrimeNode(t *testing.T) { hasBlock := peers[0] defer hasBlock.Exchange.Close() - err := hasBlock.Blockstore().Put(bg, block) + err := hasBlock.Blockstore.Put(bg, block) require.NoError(t, err) err = hasBlock.Exchange.NotifyNewBlocks(bg, block) @@ -56,7 +56,7 @@ func TestFetchIPLDPrimeNode(t *testing.T) { wantsBlock := peers[1] defer wantsBlock.Exchange.Close() - wantsGetter := blockservice.New(wantsBlock.Blockstore(), wantsBlock.Exchange) + wantsGetter := blockservice.New(wantsBlock.Blockstore, wantsBlock.Exchange) fetcherConfig := bsfetcher.NewFetcherConfig(wantsGetter) session := fetcherConfig.NewSession(context.Background()) @@ -98,7 +98,7 @@ func TestFetchIPLDGraph(t *testing.T) { defer hasBlock.Exchange.Close() blocks := []blocks.Block{block1, block2, block3, block4} - err := hasBlock.Blockstore().PutMany(bg, blocks) + err := hasBlock.Blockstore.PutMany(bg, blocks) require.NoError(t, err) err = hasBlock.Exchange.NotifyNewBlocks(bg, blocks...) require.NoError(t, err) @@ -106,7 +106,7 @@ func TestFetchIPLDGraph(t *testing.T) { wantsBlock := peers[1] defer wantsBlock.Exchange.Close() - wantsGetter := blockservice.New(wantsBlock.Blockstore(), wantsBlock.Exchange) + wantsGetter := blockservice.New(wantsBlock.Blockstore, wantsBlock.Exchange) fetcherConfig := bsfetcher.NewFetcherConfig(wantsGetter) session := fetcherConfig.NewSession(context.Background()) ctx, cancel := context.WithTimeout(context.Background(), time.Second) @@ -155,7 +155,7 @@ func TestFetchIPLDPath(t *testing.T) { defer hasBlock.Exchange.Close() blocks := []blocks.Block{block1, block2, block3, block4, block5} - err := hasBlock.Blockstore().PutMany(bg, blocks) + err := hasBlock.Blockstore.PutMany(bg, blocks) require.NoError(t, err) err = hasBlock.Exchange.NotifyNewBlocks(bg, blocks...) require.NoError(t, err) @@ -163,7 +163,7 @@ func TestFetchIPLDPath(t *testing.T) { wantsBlock := peers[1] defer wantsBlock.Exchange.Close() - wantsGetter := blockservice.New(wantsBlock.Blockstore(), wantsBlock.Exchange) + wantsGetter := blockservice.New(wantsBlock.Blockstore, wantsBlock.Exchange) fetcherConfig := bsfetcher.NewFetcherConfig(wantsGetter) session := fetcherConfig.NewSession(context.Background()) ctx, cancel := context.WithTimeout(context.Background(), time.Second) @@ -219,7 +219,7 @@ func TestHelpers(t *testing.T) { defer hasBlock.Exchange.Close() blocks := []blocks.Block{block1, block2, block3, block4} - err := hasBlock.Blockstore().PutMany(bg, blocks) + err := hasBlock.Blockstore.PutMany(bg, blocks) require.NoError(t, err) err = hasBlock.Exchange.NotifyNewBlocks(bg, blocks...) require.NoError(t, err) @@ -227,7 +227,7 @@ func TestHelpers(t *testing.T) { wantsBlock := peers[1] defer wantsBlock.Exchange.Close() - wantsGetter := blockservice.New(wantsBlock.Blockstore(), wantsBlock.Exchange) + wantsGetter := blockservice.New(wantsBlock.Blockstore, wantsBlock.Exchange) t.Run("Block retrieves node", func(t *testing.T) { fetcherConfig := bsfetcher.NewFetcherConfig(wantsGetter) @@ -334,7 +334,7 @@ func TestNodeReification(t *testing.T) { defer hasBlock.Exchange.Close() blocks := []blocks.Block{block2, block3, block4} - err := hasBlock.Blockstore().PutMany(bg, blocks) + err := hasBlock.Blockstore.PutMany(bg, blocks) require.NoError(t, err) err = hasBlock.Exchange.NotifyNewBlocks(bg, blocks...) require.NoError(t, err) @@ -342,7 +342,7 @@ func TestNodeReification(t *testing.T) { wantsBlock := peers[1] defer wantsBlock.Exchange.Close() - wantsGetter := blockservice.New(wantsBlock.Blockstore(), wantsBlock.Exchange) + wantsGetter := blockservice.New(wantsBlock.Blockstore, wantsBlock.Exchange) fetcherConfig := bsfetcher.NewFetcherConfig(wantsGetter) nodeReifier := func(lnkCtx ipld.LinkContext, nd ipld.Node, ls *ipld.LinkSystem) (ipld.Node, error) { return &selfLoader{Node: nd, ctx: lnkCtx.Ctx, ls: ls}, nil diff --git a/routing/mock/centralized_server.go b/routing/mock/centralized_server.go index d55de70814..85c7688146 100644 --- a/routing/mock/centralized_server.go +++ b/routing/mock/centralized_server.go @@ -39,7 +39,7 @@ func (rs *s) Announce(p peer.AddrInfo, c cid.Cid) error { rs.lock.Lock() defer rs.lock.Unlock() - k := c.KeyString() + k := c.Hash().String() _, ok := rs.providers[k] if !ok { @@ -54,16 +54,16 @@ func (rs *s) Announce(p peer.AddrInfo, c cid.Cid) error { func (rs *s) Providers(c cid.Cid) []peer.AddrInfo { rs.delayConf.Query.Wait() // before locking - rs.lock.RLock() defer rs.lock.RUnlock() - k := c.KeyString() + k := c.Hash().String() var ret []peer.AddrInfo records, ok := rs.providers[k] if !ok { return ret } + for _, r := range records { if time.Since(r.Created) > rs.delayConf.ValueVisibility.Get() { ret = append(ret, r.Peer) @@ -74,7 +74,6 @@ func (rs *s) Providers(c cid.Cid) []peer.AddrInfo { j := rand.Intn(i + 1) ret[i], ret[j] = ret[j], ret[i] } - return ret }