Skip to content

Commit

Permalink
Bitswap: refactor content provider elements
Browse files Browse the repository at this point in the history
This PR performs a rather large and touchy refactor of things related to
Content providing and Content discovery previously embedded into Bitswap.

The motivations:

  * Make ProviderQueryManager options configurable

  * Align and separate coalesced layers: content routing must not be part of
  bitswap as in the future we will be using different exchanges (bitswap, http)
  for retrieval and content routing should be above exchange.

  * Align content routing interfaces with libp2p: to avoid crust, wrappers and
    user confusion, align Providers and Discovery types to
    libp2p.ContentRouting.

  * Reduce duplicated functionality: i.e. code that handles providing in
  multiple places and fails to take advantage of ProvideMany optimizations.

As a result:

  * ProviderQueryManager is now part of the routing module

  * A new providing.Exchange has been created

  * Bitswap initialization params have changed and Bitswap Network doesn't
    provide anymore (see changelog for more details)

Co-authored-by: Andrew Gillis <[email protected]>
Co-authored-by: Adin Schmahmann <[email protected]>
  • Loading branch information
3 people committed Nov 25, 2024
1 parent c91cc1d commit 6d9cc8b
Show file tree
Hide file tree
Showing 35 changed files with 725 additions and 550 deletions.
47 changes: 47 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,52 @@ The following emojis are used to highlight certain changes:

## [Unreleased]

- `bitswap`, `routing`, `exchange` ([#641](https://github.com/ipfs/boxo/pull/641)):
- ✨ Bitswap is no longer in charge of providing blocks to the newtork: providing functionality is now handled by a `exchange/providing.Exchange`, meant to be used with `provider.System` so that all provides follow the same rules (multiple parts of the code where handling provides) before.
- 🛠 `bitswap/client/internal/providerquerymanager` has been moved to `routing/providerquerymanager` where it belongs. In order to keep compatibility, Bitswap now receives a `routing.ContentDiscovery` parameter which implements `FindProvidersAsync(...)` and uses it to create a `providerquerymanager` with the default settings as before. Custom settings can be used by using a custom `providerquerymanager` to manually wrap a `ContentDiscovery` object and pass that in as `ContentDiscovery` on initialization while setting `bitswap.WithDefaultProviderQueryManager(false)` (to avoid re-wrapping it again).
- The renovated `providedQueryManager` will trigger lookups until it manages to connect to `MaxProviders`. Before it would lookup at most `MaxInProcessRequests*MaxProviders` and connection failures may have limited the actual number of providers found.
- 🛠 We have aligned our routing-related interfaces with the libp2p [`routing`](https://pkg.go.dev/github.com/libp2p/go-libp2p/core/routing#ContentRouting) ones, including in the `reprovider.System`.
- In order to obtain exactly the same behaviour as before (i.e. particularly ensuring that new blocks are still provided), what was done like:

```go
bswapnet := network.NewFromIpfsHost(host, contentRouter)
bswap := bitswap.New(p.ctx, bswapnet, blockstore)
bserv = blockservice.New(blockstore, bswap)
```
- becomes:

```go
// Create network: no contentRouter anymore
bswapnet := network.NewFromIpfsHost(host)
// Create Bitswap: a new "discovery" parameter, usually the "contentRouter"
// which does both discovery and providing.
bswap := bitswap.New(p.ctx, bswapnet, discovery, blockstore)
// A provider system that handles concurrent provides etc. "contentProvider"
// is usually the "contentRouter" which does both discovery and providing.
// "contentProvider" could be used directly without wrapping, but it is recommended
// to do so to provide more efficiently.
provider := provider.New(datastore, provider.Online(contentProvider)
// A wrapped providing exchange using the previous exchange and the provider.
exch := providing.New(bswap, provider)

// Finally the blockservice
bserv := blockservice.New(blockstore, exch)
...
```

- The above is only necessary if content routing is needed. Otherwise:

```
// Create network: no contentRouter anymore
bswapnet := network.NewFromIpfsHost(host)
// Create Bitswap: a new "discovery" parameter set to nil (disable content discovery)
bswap := bitswap.New(p.ctx, bswapnet, nil, blockstore)
// Finally the blockservice
bserv := blockservice.New(blockstore, exch)
```



### Added

- `routing/http/server`: added built-in Prometheus instrumentation to http delegated `/routing/v1/` endpoints, with custom buckets for response size and duration to match real world data observed at [the `delegated-ipfs.dev` instance](https://docs.ipfs.tech/concepts/public-utilities/#delegated-routing). [#718](https://github.com/ipfs/boxo/pull/718) [#724](https://github.com/ipfs/boxo/pull/724)
Expand Down Expand Up @@ -117,6 +163,7 @@ The following emojis are used to highlight certain changes:
- `bitswap/client` fix memory leak in BlockPresenceManager due to unlimited map growth. [#636](https://github.com/ipfs/boxo/pull/636)
- `bitswap/network` fixed race condition when a timeout occurred before hole punching completed while establishing a first-time stream to a peer behind a NAT [#651](https://github.com/ipfs/boxo/pull/651)
- `bitswap`: wantlist overflow handling now cancels existing entries to make room for newer entries. This fix prevents the wantlist from filling up with CIDs that the server does not have. [#629](https://github.com/ipfs/boxo/pull/629)
- 🛠 `bitswap` & `bitswap/server` no longer provide to content routers, instead you can use the `provider` package because it uses a datastore queue and batches calls to ProvideMany.

## [v0.21.0]

Expand Down
43 changes: 22 additions & 21 deletions bitswap/benchmarks_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,17 +12,16 @@ import (
"testing"
"time"

blocks "github.com/ipfs/go-block-format"
"github.com/ipfs/go-test/random"
protocol "github.com/libp2p/go-libp2p/core/protocol"

"github.com/ipfs/boxo/bitswap"
bsnet "github.com/ipfs/boxo/bitswap/network"
testinstance "github.com/ipfs/boxo/bitswap/testinstance"
tn "github.com/ipfs/boxo/bitswap/testnet"
mockrouting "github.com/ipfs/boxo/routing/mock"
blocks "github.com/ipfs/go-block-format"
cid "github.com/ipfs/go-cid"
delay "github.com/ipfs/go-ipfs-delay"
"github.com/ipfs/go-test/random"
protocol "github.com/libp2p/go-libp2p/core/protocol"
)

type fetchFunc func(b *testing.B, bs *bitswap.Bitswap, ks []cid.Cid)
Expand Down Expand Up @@ -135,24 +134,25 @@ func BenchmarkFetchFromOldBitswap(b *testing.B) {
benchmarkLog = nil
fixedDelay := delay.Fixed(10 * time.Millisecond)
bstoreLatency := time.Duration(0)
router := mockrouting.NewServer()

for _, bch := range mixedBenches {
b.Run(bch.name, func(b *testing.B) {
fetcherCount := bch.fetcherCount
oldSeedCount := bch.oldSeedCount
newSeedCount := bch.nodeCount - (fetcherCount + oldSeedCount)

net := tn.VirtualNetwork(mockrouting.NewServer(), fixedDelay)
net := tn.VirtualNetwork(fixedDelay)

// Simulate an older Bitswap node (old protocol ID) that doesn't
// send DONT_HAVE responses
oldProtocol := []protocol.ID{bsnet.ProtocolBitswapOneOne}
oldNetOpts := []bsnet.NetOpt{bsnet.SupportedProtocols(oldProtocol)}
oldBsOpts := []bitswap.Option{bitswap.SetSendDontHaves(false)}
oldNodeGenerator := testinstance.NewTestInstanceGenerator(net, oldNetOpts, oldBsOpts)
oldNodeGenerator := testinstance.NewTestInstanceGenerator(net, router, oldNetOpts, oldBsOpts)

// Regular new Bitswap node
newNodeGenerator := testinstance.NewTestInstanceGenerator(net, nil, nil)
newNodeGenerator := testinstance.NewTestInstanceGenerator(net, router, nil, nil)
var instances []testinstance.Instance

// Create new nodes (fetchers + seeds)
Expand Down Expand Up @@ -294,9 +294,10 @@ func BenchmarkDatacenterMultiLeechMultiSeed(b *testing.B) {
numblks := 1000

for i := 0; i < b.N; i++ {
net := tn.RateLimitedVirtualNetwork(mockrouting.NewServer(), d, rateLimitGenerator)
net := tn.RateLimitedVirtualNetwork(d, rateLimitGenerator)

ig := testinstance.NewTestInstanceGenerator(net, nil, nil)
router := mockrouting.NewServer()
ig := testinstance.NewTestInstanceGenerator(net, router, nil, nil)
defer ig.Close()

instances := ig.Instances(numnodes)
Expand All @@ -312,9 +313,9 @@ func BenchmarkDatacenterMultiLeechMultiSeed(b *testing.B) {

func subtestDistributeAndFetch(b *testing.B, numnodes, numblks int, d delay.D, bstoreLatency time.Duration, df distFunc, ff fetchFunc) {
for i := 0; i < b.N; i++ {
net := tn.VirtualNetwork(mockrouting.NewServer(), d)

ig := testinstance.NewTestInstanceGenerator(net, nil, nil)
net := tn.VirtualNetwork(d)
router := mockrouting.NewServer()
ig := testinstance.NewTestInstanceGenerator(net, router, nil, nil)

instances := ig.Instances(numnodes)
rootBlock := random.BlocksOfSize(1, rootBlockSize)
Expand All @@ -327,9 +328,9 @@ func subtestDistributeAndFetch(b *testing.B, numnodes, numblks int, d delay.D, b

func subtestDistributeAndFetchRateLimited(b *testing.B, numnodes, numblks int, d delay.D, rateLimitGenerator tn.RateLimitGenerator, blockSize int64, bstoreLatency time.Duration, df distFunc, ff fetchFunc) {
for i := 0; i < b.N; i++ {
net := tn.RateLimitedVirtualNetwork(mockrouting.NewServer(), d, rateLimitGenerator)

ig := testinstance.NewTestInstanceGenerator(net, nil, nil)
net := tn.RateLimitedVirtualNetwork(d, rateLimitGenerator)
router := mockrouting.NewServer()
ig := testinstance.NewTestInstanceGenerator(net, router, nil, nil)
defer ig.Close()

instances := ig.Instances(numnodes)
Expand Down Expand Up @@ -437,7 +438,7 @@ func runDistribution(b *testing.B, instances []testinstance.Instance, blocks []b

func allToAll(b *testing.B, provs []testinstance.Instance, blocks []blocks.Block) {
for _, p := range provs {
if err := p.Blockstore().PutMany(context.Background(), blocks); err != nil {
if err := p.Blockstore.PutMany(context.Background(), blocks); err != nil {
b.Fatal(err)
}
}
Expand All @@ -452,10 +453,10 @@ func overlap1(b *testing.B, provs []testinstance.Instance, blks []blocks.Block)
bill := provs[0]
jeff := provs[1]

if err := bill.Blockstore().PutMany(context.Background(), blks[:75]); err != nil {
if err := bill.Blockstore.PutMany(context.Background(), blks[:75]); err != nil {
b.Fatal(err)
}
if err := jeff.Blockstore().PutMany(context.Background(), blks[25:]); err != nil {
if err := jeff.Blockstore.PutMany(context.Background(), blks[25:]); err != nil {
b.Fatal(err)
}
}
Expand All @@ -473,12 +474,12 @@ func overlap2(b *testing.B, provs []testinstance.Instance, blks []blocks.Block)
even := i%2 == 0
third := i%3 == 0
if third || even {
if err := bill.Blockstore().Put(context.Background(), blk); err != nil {
if err := bill.Blockstore.Put(context.Background(), blk); err != nil {
b.Fatal(err)
}
}
if third || !even {
if err := jeff.Blockstore().Put(context.Background(), blk); err != nil {
if err := jeff.Blockstore.Put(context.Background(), blk); err != nil {
b.Fatal(err)
}
}
Expand All @@ -490,7 +491,7 @@ func overlap2(b *testing.B, provs []testinstance.Instance, blks []blocks.Block)
// but we're mostly just testing performance of the sync algorithm
func onePeerPerBlock(b *testing.B, provs []testinstance.Instance, blks []blocks.Block) {
for _, blk := range blks {
err := provs[rand.Intn(len(provs))].Blockstore().Put(context.Background(), blk)
err := provs[rand.Intn(len(provs))].Blockstore.Put(context.Background(), blk)
if err != nil {
b.Fatal(err)
}
Expand Down
16 changes: 4 additions & 12 deletions bitswap/bitswap.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"fmt"

"github.com/ipfs/boxo/bitswap/client"
"github.com/ipfs/boxo/bitswap/internal/defaults"
"github.com/ipfs/boxo/bitswap/message"
"github.com/ipfs/boxo/bitswap/network"
"github.com/ipfs/boxo/bitswap/server"
Expand Down Expand Up @@ -45,9 +44,8 @@ type bitswap interface {
}

var (
_ exchange.SessionExchange = (*Bitswap)(nil)
_ bitswap = (*Bitswap)(nil)
HasBlockBufferSize = defaults.HasBlockBufferSize
_ exchange.SessionExchange = (*Bitswap)(nil)
_ bitswap = (*Bitswap)(nil)
)

type Bitswap struct {
Expand All @@ -58,7 +56,7 @@ type Bitswap struct {
net network.BitSwapNetwork
}

func New(ctx context.Context, net network.BitSwapNetwork, bstore blockstore.Blockstore, options ...Option) *Bitswap {
func New(ctx context.Context, net network.BitSwapNetwork, providerFinder client.ProviderFinder, bstore blockstore.Blockstore, options ...Option) *Bitswap {
bs := &Bitswap{
net: net,
}
Expand All @@ -85,14 +83,10 @@ func New(ctx context.Context, net network.BitSwapNetwork, bstore blockstore.Bloc
serverOptions = append(serverOptions, server.WithTracer(tracer))
}

if HasBlockBufferSize != defaults.HasBlockBufferSize {
serverOptions = append(serverOptions, server.HasBlockBufferSize(HasBlockBufferSize))
}

ctx = metrics.CtxSubScope(ctx, "bitswap")

bs.Server = server.New(ctx, net, bstore, serverOptions...)
bs.Client = client.New(ctx, net, bstore, append(clientOptions, client.WithBlockReceivedNotifier(bs.Server))...)
bs.Client = client.New(ctx, net, providerFinder, bstore, append(clientOptions, client.WithBlockReceivedNotifier(bs.Server))...)
net.Start(bs) // use the polyfill receiver to log received errors and trace messages only once

return bs
Expand All @@ -115,7 +109,6 @@ type Stat struct {
MessagesReceived uint64
BlocksSent uint64
DataSent uint64
ProvideBufLen int
}

func (bs *Bitswap) Stat() (*Stat, error) {
Expand All @@ -138,7 +131,6 @@ func (bs *Bitswap) Stat() (*Stat, error) {
Peers: ss.Peers,
BlocksSent: ss.BlocksSent,
DataSent: ss.DataSent,
ProvideBufLen: ss.ProvideBufLen,
}, nil
}

Expand Down
Loading

0 comments on commit 6d9cc8b

Please sign in to comment.