Skip to content

Commit

Permalink
removed Startup function from ProviderQueryManager (#741)
Browse files Browse the repository at this point in the history
* removed Startup function from ProviderQueryManager

Now `providerquerymanager.New` creates a `ProvicerQueryManager` that is already started.

There is no use case for starting PQM at a later time than it is created. Removing the need to call a `Statup` function separately from `New` is more convenient and reduces the opportunity for a problem if calling `Startup` is missed or if called multiple times.

* Remove flaky portion of test - requires synchronization to test sucessive timer delays
  • Loading branch information
gammazero authored Dec 5, 2024
1 parent e9446bb commit f6befaf
Show file tree
Hide file tree
Showing 6 changed files with 3 additions and 34 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ The following emojis are used to highlight certain changes:
- `routing/http/server`: exposes Prometheus metrics on `prometheus.DefaultRegisterer` and a custom one can be provided via `WithPrometheusRegistry` [#722](https://github.com/ipfs/boxo/pull/722)
- `gateway`: `NewCacheBlockStore` and `NewCarBackend` will use `prometheus.DefaultRegisterer` when a custom one is not specified via `WithPrometheusRegistry` [#722](https://github.com/ipfs/boxo/pull/722)
- `filestore`: added opt-in `WithMMapReader` option to `FileManager` to enable memory-mapped file reads [#665](https://github.com/ipfs/boxo/pull/665)
- `bitswap/routing` `ProviderQueryManager` does not require calling `Startup` separate from `New`. [#741](https://github.com/ipfs/boxo/pull/741)

### Changed

Expand Down
1 change: 0 additions & 1 deletion bitswap/client/bitswap_with_sessions_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,6 @@ func TestCustomProviderQueryManager(t *testing.T) {
if err != nil {
t.Fatal(err)
}
pqm.Startup()
bs := bitswap.New(ctx, a.Adapter, pqm, a.Blockstore,
bitswap.WithClientOption(client.WithDefaultProviderQueryManager(false)))
a.Exchange.Close() // close old to be sure.
Expand Down
1 change: 0 additions & 1 deletion bitswap/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,6 @@ func New(parent context.Context, network bsnet.BitSwapNetwork, providerFinder Pr
// Should not be possible to hit this
panic(err)
}
pqm.Startup()
bs.pqm = pqm
}

Expand Down
16 changes: 0 additions & 16 deletions bitswap/client/internal/session/session_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -357,7 +357,6 @@ func TestSessionFailingToGetFirstBlock(t *testing.T) {
for _, block := range blks {
cids = append(cids, block.Cid())
}
startTick := time.Now()
_, err := session.GetBlocks(ctx, cids)
require.NoError(t, err, "error getting blocks")

Expand Down Expand Up @@ -389,7 +388,6 @@ func TestSessionFailingToGetFirstBlock(t *testing.T) {
case <-ctx.Done():
t.Fatal("Did not find more peers")
}
firstTickLength := time.Since(startTick)

// Wait for another broadcast to occur
select {
Expand All @@ -402,7 +400,6 @@ func TestSessionFailingToGetFirstBlock(t *testing.T) {
}

// Wait for another broadcast to occur
startTick = time.Now()
select {
case receivedWantReq := <-fpm.wantReqs:
if len(receivedWantReq.cids) < len(cids) {
Expand All @@ -412,14 +409,7 @@ func TestSessionFailingToGetFirstBlock(t *testing.T) {
t.Fatal("Never rebroadcast want list")
}

// Tick should take longer
consecutiveTickLength := time.Since(startTick)
if firstTickLength > consecutiveTickLength {
t.Fatal("Should have increased tick length after first consecutive tick")
}

// Wait for another broadcast to occur
startTick = time.Now()
select {
case receivedWantReq := <-fpm.wantReqs:
if len(receivedWantReq.cids) < len(cids) {
Expand All @@ -429,12 +419,6 @@ func TestSessionFailingToGetFirstBlock(t *testing.T) {
t.Fatal("Never rebroadcast want list")
}

// Tick should take longer
secondConsecutiveTickLength := time.Since(startTick)
if consecutiveTickLength > secondConsecutiveTickLength {
t.Fatal("Should have increased tick length after first consecutive tick")
}

// Should not have tried to find peers on consecutive ticks
select {
case <-fpf.findMorePeersRequested:
Expand Down
7 changes: 2 additions & 5 deletions routing/providerquerymanager/providerquerymanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,12 +150,9 @@ func New(ctx context.Context, dialer ProviderQueryDialer, router ProviderQueryRo
}
}

return pqm, nil
}

// Startup starts processing for the ProviderQueryManager.
func (pqm *ProviderQueryManager) Startup() {
go pqm.run()

return pqm, nil
}

type inProgressRequest struct {
Expand Down
11 changes: 0 additions & 11 deletions routing/providerquerymanager/providerquerymanager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,6 @@ func TestNormalSimultaneousFetch(t *testing.T) {
}
ctx := context.Background()
providerQueryManager := mustNotErr(New(ctx, fpd, fpn))
providerQueryManager.Startup()
keys := random.Cids(2)

sessionCtx, cancel := context.WithTimeout(ctx, 5*time.Second)
Expand Down Expand Up @@ -114,7 +113,6 @@ func TestDedupingProviderRequests(t *testing.T) {
}
ctx := context.Background()
providerQueryManager := mustNotErr(New(ctx, fpd, fpn))
providerQueryManager.Startup()
key := random.Cids(1)[0]

sessionCtx, cancel := context.WithTimeout(ctx, 5*time.Second)
Expand Down Expand Up @@ -155,7 +153,6 @@ func TestCancelOneRequestDoesNotTerminateAnother(t *testing.T) {
}
ctx := context.Background()
providerQueryManager := mustNotErr(New(ctx, fpd, fpn))
providerQueryManager.Startup()

key := random.Cids(1)[0]

Expand Down Expand Up @@ -202,7 +199,6 @@ func TestCancelManagerExitsGracefully(t *testing.T) {
managerCtx, managerCancel := context.WithTimeout(ctx, 5*time.Millisecond)
defer managerCancel()
providerQueryManager := mustNotErr(New(managerCtx, fpd, fpn))
providerQueryManager.Startup()

key := random.Cids(1)[0]

Expand Down Expand Up @@ -238,7 +234,6 @@ func TestPeersWithConnectionErrorsNotAddedToPeerList(t *testing.T) {
}
ctx := context.Background()
providerQueryManager := mustNotErr(New(ctx, fpd, fpn))
providerQueryManager.Startup()

key := random.Cids(1)[0]

Expand Down Expand Up @@ -275,7 +270,6 @@ func TestRateLimitingRequests(t *testing.T) {
ctx, cancel := context.WithCancel(ctx)
defer cancel()
providerQueryManager := mustNotErr(New(ctx, fpd, fpn, WithMaxInProcessRequests(maxInProcessRequests)))
providerQueryManager.Startup()

keys := random.Cids(maxInProcessRequests + 1)
sessionCtx, cancel := context.WithTimeout(ctx, 5*time.Second)
Expand Down Expand Up @@ -317,7 +311,6 @@ func TestUnlimitedRequests(t *testing.T) {
ctx, cancel := context.WithCancel(ctx)
defer cancel()
providerQueryManager := mustNotErr(New(ctx, fpd, fpn, WithMaxInProcessRequests(0)))
providerQueryManager.Startup()

keys := random.Cids(inProcessRequests)
sessionCtx, cancel := context.WithTimeout(ctx, 5*time.Second)
Expand Down Expand Up @@ -355,7 +348,6 @@ func TestFindProviderTimeout(t *testing.T) {
}
ctx := context.Background()
providerQueryManager := mustNotErr(New(ctx, fpd, fpn, WithMaxTimeout(2*time.Millisecond)))
providerQueryManager.Startup()
keys := random.Cids(1)

sessionCtx, cancel := context.WithTimeout(ctx, 5*time.Second)
Expand All @@ -379,7 +371,6 @@ func TestFindProviderPreCanceled(t *testing.T) {
}
ctx := context.Background()
providerQueryManager := mustNotErr(New(ctx, fpd, fpn, WithMaxTimeout(100*time.Millisecond)))
providerQueryManager.Startup()
keys := random.Cids(1)

sessionCtx, cancel := context.WithCancel(ctx)
Expand All @@ -404,7 +395,6 @@ func TestCancelFindProvidersAfterCompletion(t *testing.T) {
}
ctx := context.Background()
providerQueryManager := mustNotErr(New(ctx, fpd, fpn, WithMaxTimeout(100*time.Millisecond)))
providerQueryManager.Startup()
keys := random.Cids(1)

sessionCtx, cancel := context.WithCancel(ctx)
Expand Down Expand Up @@ -437,7 +427,6 @@ func TestLimitedProviders(t *testing.T) {
}
ctx := context.Background()
providerQueryManager := mustNotErr(New(ctx, fpd, fpn, WithMaxProviders(max), WithMaxTimeout(100*time.Millisecond)))
providerQueryManager.Startup()
keys := random.Cids(1)

providersChan := providerQueryManager.FindProvidersAsync(ctx, keys[0], 0)
Expand Down

0 comments on commit f6befaf

Please sign in to comment.