Skip to content

Commit

Permalink
exchange: create a providing.Exchange which provides on NotifyNewBlocks.
Browse files Browse the repository at this point in the history
  • Loading branch information
hsanjuan committed Nov 20, 2024
1 parent 6d74a8f commit da1e2d2
Show file tree
Hide file tree
Showing 7 changed files with 123 additions and 27 deletions.
15 changes: 7 additions & 8 deletions bitswap/testinstance/testinstance.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
tn "github.com/ipfs/boxo/bitswap/testnet"
blockstore "github.com/ipfs/boxo/blockstore"
mockrouting "github.com/ipfs/boxo/routing/mock"
"github.com/ipfs/go-datastore"
ds "github.com/ipfs/go-datastore"
delayed "github.com/ipfs/go-datastore/delayed"
ds_sync "github.com/ipfs/go-datastore/sync"
Expand Down Expand Up @@ -89,18 +90,14 @@ func ConnectInstances(instances []Instance) {
// Instance is a test instance of bitswap + dependencies for integration testing
type Instance struct {
Identity tnet.Identity
Datastore datastore.Batching
Exchange *bitswap.Bitswap
blockstore blockstore.Blockstore
Blockstore blockstore.Blockstore
Adapter bsnet.BitSwapNetwork
Routing routing.Routing
blockstoreDelay delay.D
}

// Blockstore returns the block store for this test instance
func (i *Instance) Blockstore() blockstore.Blockstore {
return i.blockstore
}

// SetBlockstoreLatency customizes the artificial delay on receiving blocks
// from a blockstore test instance.
func (i *Instance) SetBlockstoreLatency(t time.Duration) time.Duration {
Expand All @@ -118,20 +115,22 @@ func NewInstance(ctx context.Context, net tn.Network, router routing.Routing, p
adapter := net.Adapter(p, netOptions...)
dstore := ds_sync.MutexWrap(delayed.New(ds.NewMapDatastore(), bsdelay))

ds := ds_sync.MutexWrap(dstore)
bstore, err := blockstore.CachedBlockstore(ctx,
blockstore.NewBlockstore(ds_sync.MutexWrap(dstore)),
blockstore.NewBlockstore(ds),
blockstore.DefaultCacheOpts())
if err != nil {
panic(err.Error()) // FIXME perhaps change signature and return error.
}

bs := bitswap.New(ctx, adapter, router, bstore, bsOptions...)
return Instance{
Datastore: ds,
Adapter: adapter,
Identity: p,
Exchange: bs,
Routing: router,
blockstore: bstore,
Blockstore: bstore,
blockstoreDelay: bsdelay,
}
}
2 changes: 1 addition & 1 deletion blockservice/test/mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ func Mocks(n int, opts ...blockservice.Option) []blockservice.BlockService {

var servs []blockservice.BlockService
for _, i := range instances {
servs = append(servs, blockservice.New(i.Blockstore(),
servs = append(servs, blockservice.New(i.Blockstore,
i.Exchange, append(opts, blockservice.WithProvider(i.Routing))...))
}
return servs
Expand Down
36 changes: 36 additions & 0 deletions exchange/providing/providing.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
// Package providing implements an exchange wrapper which
// does content providing for new blocks.
package providing

import (
"context"

"github.com/ipfs/boxo/exchange"
"github.com/ipfs/boxo/provider"
blocks "github.com/ipfs/go-block-format"
)

// Exchange is an exchange wrapper that calls ProvideMany for blocks received
// over NotifyNewBlocks.
type Exchange struct {
exchange.Interface
provider provider.Provider
}

// New creates a new providing Exchange with the given exchange and provider.
func New(base exchange.Interface, provider provider.Provider) *Exchange {
return &Exchange{
Interface: base,
provider: provider,
}
}

// NotifyNewBlocks calls provider.ProvideMany.
func (ex *Exchange) NotifyNewBlocks(ctx context.Context, blocks ...blocks.Block) error {
for _, b := range blocks {
if err := ex.provider.Provide(ctx, b.Cid(), true); err != nil {
return err
}
}
return nil
}
62 changes: 62 additions & 0 deletions exchange/providing/providing_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
package providing

import (
"context"
"testing"

testinstance "github.com/ipfs/boxo/bitswap/testinstance"
tn "github.com/ipfs/boxo/bitswap/testnet"
"github.com/ipfs/boxo/blockservice"
"github.com/ipfs/boxo/provider"
mockrouting "github.com/ipfs/boxo/routing/mock"
delay "github.com/ipfs/go-ipfs-delay"
"github.com/ipfs/go-test/random"
)

func TestExchange(t *testing.T) {
ctx := context.Background()
net := tn.VirtualNetwork(delay.Fixed(0))
routing := mockrouting.NewServer()
sg := testinstance.NewTestInstanceGenerator(net, routing, nil, nil)
i := sg.Next()
provFinder := routing.Client(i.Identity)
prov, err := provider.New(i.Datastore,
provider.Online(provFinder),
)
if err != nil {
t.Fatal(err)
}
provExchange := New(i.Exchange, prov)
// write-through so that we notify when re-adding block
bs := blockservice.New(i.Blockstore, provExchange,
blockservice.WriteThrough())
block := random.BlocksOfSize(1, 10)[0]
// put it on the blockstore of the first instance
err = i.Blockstore.Put(ctx, block)
if err != nil {
t.Fatal()
}

providersChan := provFinder.FindProvidersAsync(ctx, block.Cid(), 1)
_, ok := <-providersChan
if ok {
t.Fatal("there should be no providers yet for block")
}

// Now add it via BlockService. It should trigger NotifyNewBlocks
// on the exchange and thus they should get announced.
err = bs.AddBlock(ctx, block)
if err != nil {
t.Fatal()
}
// Trigger reproviding, otherwise it's not really provided.
err = prov.Reprovide(ctx)
if err != nil {
t.Fatal(err)
}
providersChan = provFinder.FindProvidersAsync(ctx, block.Cid(), 1)
_, ok = <-providersChan
if !ok {
t.Fatal("there should be one provider for the block")
}
}
8 changes: 4 additions & 4 deletions fetcher/helpers/block_visitor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,15 +54,15 @@ func TestFetchGraphToBlocks(t *testing.T) {
defer hasBlock.Exchange.Close()

blocks := []blocks.Block{block1, block2, block3, block4}
err := hasBlock.Blockstore().PutMany(bg, blocks)
err := hasBlock.Blockstore.PutMany(bg, blocks)
require.NoError(t, err)
err = hasBlock.Exchange.NotifyNewBlocks(bg, blocks...)
require.NoError(t, err)

wantsBlock := peers[1]
defer wantsBlock.Exchange.Close()

wantsGetter := blockservice.New(wantsBlock.Blockstore(), wantsBlock.Exchange)
wantsGetter := blockservice.New(wantsBlock.Blockstore, wantsBlock.Exchange)
fetcherConfig := bsfetcher.NewFetcherConfig(wantsGetter)
session := fetcherConfig.NewSession(context.Background())
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
Expand Down Expand Up @@ -104,7 +104,7 @@ func TestFetchGraphToUniqueBlocks(t *testing.T) {
hasBlock := peers[0]
defer hasBlock.Exchange.Close()

err := hasBlock.Blockstore().PutMany(bg, []blocks.Block{block1, block2, block3})
err := hasBlock.Blockstore.PutMany(bg, []blocks.Block{block1, block2, block3})
require.NoError(t, err)

err = hasBlock.Exchange.NotifyNewBlocks(bg, block1, block2, block3)
Expand All @@ -113,7 +113,7 @@ func TestFetchGraphToUniqueBlocks(t *testing.T) {
wantsBlock := peers[1]
defer wantsBlock.Exchange.Close()

wantsGetter := blockservice.New(wantsBlock.Blockstore(), wantsBlock.Exchange)
wantsGetter := blockservice.New(wantsBlock.Blockstore, wantsBlock.Exchange)
fetcherConfig := bsfetcher.NewFetcherConfig(wantsGetter)
session := fetcherConfig.NewSession(context.Background())
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
Expand Down
20 changes: 10 additions & 10 deletions fetcher/impl/blockservice/fetcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ func TestFetchIPLDPrimeNode(t *testing.T) {
hasBlock := peers[0]
defer hasBlock.Exchange.Close()

err := hasBlock.Blockstore().Put(bg, block)
err := hasBlock.Blockstore.Put(bg, block)
require.NoError(t, err)

err = hasBlock.Exchange.NotifyNewBlocks(bg, block)
Expand All @@ -56,7 +56,7 @@ func TestFetchIPLDPrimeNode(t *testing.T) {
wantsBlock := peers[1]
defer wantsBlock.Exchange.Close()

wantsGetter := blockservice.New(wantsBlock.Blockstore(), wantsBlock.Exchange)
wantsGetter := blockservice.New(wantsBlock.Blockstore, wantsBlock.Exchange)
fetcherConfig := bsfetcher.NewFetcherConfig(wantsGetter)
session := fetcherConfig.NewSession(context.Background())

Expand Down Expand Up @@ -98,15 +98,15 @@ func TestFetchIPLDGraph(t *testing.T) {
defer hasBlock.Exchange.Close()

blocks := []blocks.Block{block1, block2, block3, block4}
err := hasBlock.Blockstore().PutMany(bg, blocks)
err := hasBlock.Blockstore.PutMany(bg, blocks)
require.NoError(t, err)
err = hasBlock.Exchange.NotifyNewBlocks(bg, blocks...)
require.NoError(t, err)

wantsBlock := peers[1]
defer wantsBlock.Exchange.Close()

wantsGetter := blockservice.New(wantsBlock.Blockstore(), wantsBlock.Exchange)
wantsGetter := blockservice.New(wantsBlock.Blockstore, wantsBlock.Exchange)
fetcherConfig := bsfetcher.NewFetcherConfig(wantsGetter)
session := fetcherConfig.NewSession(context.Background())
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
Expand Down Expand Up @@ -155,15 +155,15 @@ func TestFetchIPLDPath(t *testing.T) {
defer hasBlock.Exchange.Close()

blocks := []blocks.Block{block1, block2, block3, block4, block5}
err := hasBlock.Blockstore().PutMany(bg, blocks)
err := hasBlock.Blockstore.PutMany(bg, blocks)
require.NoError(t, err)
err = hasBlock.Exchange.NotifyNewBlocks(bg, blocks...)
require.NoError(t, err)

wantsBlock := peers[1]
defer wantsBlock.Exchange.Close()

wantsGetter := blockservice.New(wantsBlock.Blockstore(), wantsBlock.Exchange)
wantsGetter := blockservice.New(wantsBlock.Blockstore, wantsBlock.Exchange)
fetcherConfig := bsfetcher.NewFetcherConfig(wantsGetter)
session := fetcherConfig.NewSession(context.Background())
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
Expand Down Expand Up @@ -219,15 +219,15 @@ func TestHelpers(t *testing.T) {
defer hasBlock.Exchange.Close()

blocks := []blocks.Block{block1, block2, block3, block4}
err := hasBlock.Blockstore().PutMany(bg, blocks)
err := hasBlock.Blockstore.PutMany(bg, blocks)
require.NoError(t, err)
err = hasBlock.Exchange.NotifyNewBlocks(bg, blocks...)
require.NoError(t, err)

wantsBlock := peers[1]
defer wantsBlock.Exchange.Close()

wantsGetter := blockservice.New(wantsBlock.Blockstore(), wantsBlock.Exchange)
wantsGetter := blockservice.New(wantsBlock.Blockstore, wantsBlock.Exchange)

t.Run("Block retrieves node", func(t *testing.T) {
fetcherConfig := bsfetcher.NewFetcherConfig(wantsGetter)
Expand Down Expand Up @@ -334,15 +334,15 @@ func TestNodeReification(t *testing.T) {
defer hasBlock.Exchange.Close()

blocks := []blocks.Block{block2, block3, block4}
err := hasBlock.Blockstore().PutMany(bg, blocks)
err := hasBlock.Blockstore.PutMany(bg, blocks)
require.NoError(t, err)
err = hasBlock.Exchange.NotifyNewBlocks(bg, blocks...)
require.NoError(t, err)

wantsBlock := peers[1]
defer wantsBlock.Exchange.Close()

wantsGetter := blockservice.New(wantsBlock.Blockstore(), wantsBlock.Exchange)
wantsGetter := blockservice.New(wantsBlock.Blockstore, wantsBlock.Exchange)
fetcherConfig := bsfetcher.NewFetcherConfig(wantsGetter)
nodeReifier := func(lnkCtx ipld.LinkContext, nd ipld.Node, ls *ipld.LinkSystem) (ipld.Node, error) {
return &selfLoader{Node: nd, ctx: lnkCtx.Ctx, ls: ls}, nil
Expand Down
7 changes: 3 additions & 4 deletions routing/mock/centralized_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func (rs *s) Announce(p peer.AddrInfo, c cid.Cid) error {
rs.lock.Lock()
defer rs.lock.Unlock()

k := c.KeyString()
k := c.Hash().String()

_, ok := rs.providers[k]
if !ok {
Expand All @@ -54,16 +54,16 @@ func (rs *s) Announce(p peer.AddrInfo, c cid.Cid) error {

func (rs *s) Providers(c cid.Cid) []peer.AddrInfo {
rs.delayConf.Query.Wait() // before locking

rs.lock.RLock()
defer rs.lock.RUnlock()
k := c.KeyString()
k := c.Hash().String()

var ret []peer.AddrInfo
records, ok := rs.providers[k]
if !ok {
return ret
}

for _, r := range records {
if time.Since(r.Created) > rs.delayConf.ValueVisibility.Get() {
ret = append(ret, r.Peer)
Expand All @@ -74,7 +74,6 @@ func (rs *s) Providers(c cid.Cid) []peer.AddrInfo {
j := rand.Intn(i + 1)
ret[i], ret[j] = ret[j], ret[i]
}

return ret
}

Expand Down

0 comments on commit da1e2d2

Please sign in to comment.