-
Notifications
You must be signed in to change notification settings - Fork 100
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Bitswap: move providing -> Exchange-layer, providerQueryManager -> routing #641
Conversation
a6677c4
to
f538d1f
Compare
Codecov ReportAttention: Patch coverage is
@@ Coverage Diff @@
## main #641 +/- ##
==========================================
- Coverage 60.43% 60.35% -0.08%
==========================================
Files 243 244 +1
Lines 31059 31034 -25
==========================================
- Hits 18771 18732 -39
- Misses 10628 10633 +5
- Partials 1660 1669 +9
|
f538d1f
to
e95eeb2
Compare
FindProvidersAsync(ctx context.Context, k cid.Cid) <-chan peer.ID | ||
} | ||
|
||
type FindAllProviders struct { | ||
Router bsnet.BitSwapNetwork | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we're going to keep the function here as not taking a count
the name should get changed from FindProvidersAsync
to one that doesn't collide with the more widely used function that does take a count so implementations can use both if they want.
provCh := make(chan peer.ID) | ||
wg := &sync.WaitGroup{} | ||
for p := range providers { | ||
wg.Add(1) | ||
go func(p peer.ID) { | ||
defer wg.Done() | ||
span.AddEvent("FoundProvider", trace.WithAttributes(attribute.Stringer("peer", p))) | ||
err := r.Router.ConnectTo(ctx, p) | ||
if err != nil { | ||
span.RecordError(err, trace.WithAttributes(attribute.Stringer("peer", p))) | ||
log.Debugf("failed to connect to provider %s: %s", p, err) | ||
return | ||
} | ||
span.AddEvent("ConnectedToProvider", trace.WithAttributes(attribute.Stringer("peer", p))) | ||
select { | ||
case provCh <- p: | ||
case <-ctx.Done(): | ||
return | ||
} | ||
}(p) | ||
} | ||
go func() { | ||
wg.Wait() | ||
close(provCh) | ||
}() | ||
return provCh |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Note: this was copied out of the ProviderQueryManager because the internals (or at least the tests) seem to rely on having the connections made explicit here. It seems reasonable to extract this up a level out of the routing component though.
@aschmahmann Does this mean we still want the provider query manager, and need to close #536 which removes it. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess this comes with the need not to replace the default provider itself, but to be able to create it by hand and turn the knobs (and guessing it's mostly about increasing the amount of workers).
It is similar to #535 although that includes some other things (FindProvidersAsync returns AddrInfos instead of PeerIDs, and Router and Network are separate things).
If we need it we can merge this, otherwise the direction of #535 seems better but I guess it's a more invasive change (haven't followed too much). Code-wise I don't see anything weird, so green light on that front.
|
||
func WithClientOption(opt client.Option) Option { | ||
return Option{opt} | ||
} | ||
|
||
func WithServerOption(opt server.Option) Option { | ||
return Option{opt} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This was an unrelated change and should be a separate PR, I'll raise it separately. TLDR though is that bitswap.New
can take options for the client or the server, but right now the only way to construct them is if they're forwarded as regular Bitswap options, otherwise you have to construct the client and server yourself. This saves on duplication while being more explicit and was something I ran into here because a new client option was introduced.
bitswap/client/client.go
Outdated
func WithDefaultLookupManagement(b bool) Option { | ||
return func(bs *Client) { | ||
bs.useDefaultLookupManagement = true | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In #535 , the Network is separated from the Router and that makes sense to me. As a user, if I want to use a custom Router, I am going to have to glue the normal Network (ConnectTo) and my custom Router (FindProvidersAsync) into a custom BitswapNetwork. I'm not sure if that requires much more refactoring though, given that #535 is based on other additional changes.
This would also allow to disable the ProviderQueryManager altogether when it is not passed in (here it's impossible).
func (bsnet *impl) FindProvidersAsync(ctx context.Context, k cid.Cid, max int) <-chan peer.ID { | ||
out := make(chan peer.ID, max) | ||
go func() { | ||
defer close(out) | ||
providers := bsnet.routing.FindProvidersAsync(ctx, k, max) | ||
for info := range providers { | ||
if info.ID == bsnet.host.ID() { | ||
continue // ignore self as provider | ||
} | ||
bsnet.host.Peerstore().AddAddrs(info.ID, info.Addrs, peerstore.TempAddrTTL) | ||
select { | ||
case <-ctx.Done(): | ||
return | ||
case out <- info.ID: | ||
} | ||
} | ||
}() | ||
return out |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
All of this is not needed because our query manager is doing Connect(peerAddr)
instead of ConnectTo(peerID)
. Addresses will be absorbed into the peerstore directly. At his point BSnet is only a convenience to have content router available where it is not passed explicitally.
span.AddEvent("FoundProvider", trace.WithAttributes(attribute.Stringer("peer", p))) | ||
err := pqm.network.ConnectTo(findProviderCtx, p) | ||
span.AddEvent("FoundProvider", trace.WithAttributes(attribute.Stringer("peer", p.ID))) | ||
err := pqm.dialer.Connect(findProviderCtx, p) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The beauty is this can now be a libp2p.Host directly. No longer need to rely on the network having absorbed provider peer addresses into the peerstore prior to connecting because we have them here already.
bitswap/client/client.go
Outdated
pqm := bspqm.New(ctx, network) | ||
if bs.pqm == nil { // not set with the options | ||
// network can do dialing and also content routing. | ||
pqm, err := rpqm.New(ctx, network, network, rpqm.WithMaxProviders(10)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we don't want to pass-in a Host and a ContentRouter directly to the Client, we need to rely on Network, which has them, but it would be better to remove Content Routing capabilities from the Network because it is just wrapping the content Router and nothing else at this point.
lol I meant to just comment, not approve my own changes |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @hsanjuan for the changes. Looks reasonable to me, left a couple questions
@@ -51,7 +51,7 @@ func (c *client) FindProvidersAsync(ctx context.Context, k cid.Cid, max int) <-c | |||
go func() { | |||
defer close(out) | |||
for i, p := range c.server.Providers(k) { | |||
if max <= i { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is an unrelated fix due to this code not understanding that 0 means infinity, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't know, you did this change 🤷♂️
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
oh, lol. Yeah, that's why it's there. I was noticing some weirdness with testing.
Sync with @aschmahmann :
I think this is the gist @aschmahmann . I think the last option would be ok (there's breakage but it both a) has a stronger reason b) is easy to fix. |
Do you mean to do this?
Existing consumers would migrate by moving their ContentRouter from bitswap.Network into the bitswap.Client option If so, that sounds reasonable to me. If you think pulling out the Providing is too much to do here we could also do something like:
|
ok, currently we are here... see my review for a couple of things |
bitswap/testnet/virtual.go
Outdated
@@ -254,20 +254,20 @@ func (nc *networkClient) Stats() bsnet.Stats { | |||
} | |||
|
|||
// FindProvidersAsync returns a channel of providers for the given key. | |||
func (nc *networkClient) FindProvidersAsync(ctx context.Context, k cid.Cid, max int) <-chan peer.ID { | |||
func (nc *networkClient) FindProvidersAsync(ctx context.Context, k cid.Cid, max int) <-chan peer.AddrInfo { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This can be removed now, but more important, we need to wire the mock content-router to the tests at a higher level (instance generator etc) as it is no longer in the network. currently the tests work without content routing.
6c00392
to
7359d9f
Compare
@@ -114,6 +114,7 @@ The following emojis are used to highlight certain changes: | |||
- `bitswap/client` fix memory leak in BlockPresenceManager due to unlimited map growth. [#636](https://github.com/ipfs/boxo/pull/636) | |||
- `bitswap/network` fixed race condition when a timeout occurred before hole punching completed while establishing a first-time stream to a peer behind a NAT [#651](https://github.com/ipfs/boxo/pull/651) | |||
- `bitswap`: wantlist overflow handling now cancels existing entries to make room for newer entries. This fix prevents the wantlist from filling up with CIDs that the server does not have. [#629](https://github.com/ipfs/boxo/pull/629) | |||
- 🛠 `bitswap` & `bitswap/server` no longer provide to content routers, instead you can use the `provider` package because it uses a datastore queue and batches calls to ProvideMany. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reminder to fix this before merge
@@ -18,7 +18,7 @@ var logR = logging.Logger("reprovider.simple") | |||
// Provider announces blocks to the network | |||
type Provider interface { | |||
// Provide takes a cid and makes an attempt to announce it to the network | |||
Provide(cid.Cid) error | |||
Provide(context.Context, cid.Cid, bool) error |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have updated this interface so that it matches the signature from ContentRouting, so that we can drop this interface altogether if they merge libp2p/go-libp2p#3048
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It was merged, now wait for libp2p release
) | ||
|
||
// NewTestInstanceGenerator generates a new InstanceGenerator for the given | ||
// testnet | ||
func NewTestInstanceGenerator(net tn.Network, netOptions []bsnet.NetOpt, bsOptions []bitswap.Option) InstanceGenerator { | ||
func NewTestInstanceGenerator(net tn.Network, routing mockrouting.Server, netOptions []bsnet.NetOpt, bsOptions []bitswap.Option) InstanceGenerator { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We need to pass it in from tests because some tests do want to share a mock router among the different things that use it (Blockservice/Instance etc).
@@ -84,10 +88,11 @@ func ConnectInstances(instances []Instance) { | |||
|
|||
// Instance is a test instance of bitswap + dependencies for integration testing | |||
type Instance struct { | |||
Peer peer.ID | |||
Identity tnet.Identity |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is a test that needs the identity to create a mock client provider on a different provider server to test that blocks provided on a separate network are not received etc etc. so this was necessary.
// careful when bs.pqm is nil. Since we are type-casting it | ||
// into session.ProviderFinder when passing it, it will become | ||
// not nil. Related: | ||
// https://groups.google.com/g/golang-nuts/c/wnH302gBa4I?pli=1 | ||
var pqm bssession.ProviderFinder | ||
if bs.pqm != nil { | ||
pqm = bs.pqm | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Allowing to pass nil parameters is not awesome either...
94d755b
to
0ca70f4
Compare
@aschmahmann @gammazero I have a question: I was going to port the "Provider" logic from bitswap server, with its workers and queues and what not. Then I saw that we have a provider.System (the reprovider) that does have queues, does batching and even stores pending provides to disk. Naively my thought is that, if such logic is already implemented in the reprovider, a providing.Exchange should not yet have its own way and we should just pass-in the reprovider when using it. Plus I suspect that we make use of things like Accelerated-DHT-providing, What do you think? |
Makes sense to me to leave it out. It's unclear how useful the parallel queues were beforehand, but it seems like the kind of thing that's situational enough that people can choose to add whatever queues they need and we should start off keeping it simple and encouraging reuse of the existing queue. |
b7c5f16
to
19cd67f
Compare
ipfs/kubo#10595 is testing this. |
This PR performs a rather large and touchy refactor of things related to Content providing and Content discovery previously embedded into Bitswap. The motivations: * Make ProviderQueryManager options configurable * Align and separate coalesced layers: content routing must not be part of bitswap as in the future we will be using different exchanges (bitswap, http) for retrieval and content routing should be above exchange. * Align content routing interfaces with libp2p: to avoid crust, wrappers and user confusion, align Providers and Discovery types to libp2p.ContentRouting. * Reduce duplicated functionality: i.e. code that handles providing in multiple places and fails to take advantage of ProvideMany optimizations. As a result: * ProviderQueryManager is now part of the routing module * A new providing.Exchange has been created * Bitswap initialization params have changed and Bitswap Network doesn't provide anymore (see changelog for more details) Co-authored-by: Andrew Gillis <[email protected]> Co-authored-by: Adin Schmahmann <[email protected]>
8b304f2
to
6d9cc8b
Compare
Squashed |
fixes #640
This is an attempt at splitting the ProviderQueryManager out of Bitswap so that we can configure it more in consumers.