Skip to content

Commit

Permalink
Rename maxInProcessRequests to maxConcurrentFinds
Browse files Browse the repository at this point in the history
  • Loading branch information
gammazero committed Nov 22, 2024
1 parent 79a90ab commit 7cdeb4b
Show file tree
Hide file tree
Showing 5 changed files with 29 additions and 29 deletions.
4 changes: 2 additions & 2 deletions bitswap/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,8 +76,8 @@ func New(parent context.Context, network bsnet.BitSwapNetwork, bstore blockstore
bpm := bsbpm.New()
pm := bspm.New(ctx, peerQueueFactory, network.Self())
pqm := bspqm.New(ctx, network,
bspqm.WithMaxProvidersPerFind(opts.pqmMaxProvidersPerFind),
bspqm.WithMaxInProcessRequests(opts.pqmMaxInProcessRequests))
bspqm.WithMaxConcurrentFinds(opts.pqmMaxConcurrentFinds),
bspqm.WithMaxProvidersPerFind(opts.pqmMaxProvidersPerFind))

sessionFactory := func(
sessctx context.Context,
Expand Down
24 changes: 12 additions & 12 deletions bitswap/client/internal/providerquerymanager/options.go
Original file line number Diff line number Diff line change
@@ -1,42 +1,42 @@
package providerquerymanager

const (
defaultMaxProvidersPerFind = 10
defaultMaxInProcessRequests = 6
defaultMaxConcurrentFinds = 16
defaultMaxProvidersPerFind = 10
)

type config struct {
maxProvidersPerFind int
maxInProcessRequests int
maxConcurrentFinds int
maxProvidersPerFind int
}

type Option func(*config)

func getOpts(opts []Option) config {
cfg := config{
maxProvidersPerFind: defaultMaxProvidersPerFind,
maxInProcessRequests: defaultMaxInProcessRequests,
maxConcurrentFinds: defaultMaxConcurrentFinds,
maxProvidersPerFind: defaultMaxProvidersPerFind,
}
for _, opt := range opts {
opt(&cfg)
}
return cfg
}

func WithMaxProvidersPerFind(n int) Option {
func WithMaxConcurrentFinds(n int) Option {
return func(c *config) {
if n == 0 {
n = defaultMaxProvidersPerFind
n = defaultMaxConcurrentFinds
}
c.maxProvidersPerFind = n
c.maxConcurrentFinds = n
}
}

func WithMaxInProcessRequests(n int) Option {
func WithMaxProvidersPerFind(n int) Option {
return func(c *config) {
if n == 0 {
n = defaultMaxInProcessRequests
n = defaultMaxProvidersPerFind
}
c.maxInProcessRequests = n
c.maxProvidersPerFind = n
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,7 @@ func (pqm *ProviderQueryManager) cancelProviderRequest(ctx context.Context, k ci

func (pqm *ProviderQueryManager) findProviderWorker() {
// findProviderWorker just cycles through incoming provider queries one at
// a time. There are maxInProcessRequests of these workers running
// a time. There are pqm.opts.maxConcurrentFinds of these workers running
// concurrently to let requests go in parallel but keep them rate limited.
maxProviders := pqm.opts.maxProvidersPerFind
for {
Expand Down Expand Up @@ -312,8 +312,8 @@ func (pqm *ProviderQueryManager) run() {
}()
}()

wg.Add(pqm.opts.maxInProcessRequests)
for i := 0; i < pqm.opts.maxInProcessRequests; i++ {
wg.Add(pqm.opts.maxConcurrentFinds)
for i := 0; i < pqm.opts.maxConcurrentFinds; i++ {
go func() {
pqm.findProviderWorker()
wg.Done()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -258,29 +258,29 @@ func TestRateLimitingRequests(t *testing.T) {
providerQueryManager := New(ctx, fpn)
providerQueryManager.Startup()

maxInProcessRequests := providerQueryManager.opts.maxInProcessRequests
keys := random.Cids(maxInProcessRequests + 1)
maxConcurrentFinds := providerQueryManager.opts.maxConcurrentFinds
keys := random.Cids(maxConcurrentFinds + 1)
sessionCtx, cancel := context.WithTimeout(ctx, 5*time.Second)
defer cancel()
var requestChannels []<-chan peer.ID
for i := 0; i < maxInProcessRequests+1; i++ {
for i := 0; i < maxConcurrentFinds+1; i++ {
requestChannels = append(requestChannels, providerQueryManager.FindProvidersAsync(sessionCtx, keys[i]))
}
time.Sleep(20 * time.Millisecond)
fpn.queriesMadeMutex.Lock()
if fpn.liveQueries != maxInProcessRequests {
if fpn.liveQueries != maxConcurrentFinds {
t.Logf("Queries made: %d\n", fpn.liveQueries)
t.Fatal("Did not limit parallel requests to rate limit")
}
fpn.queriesMadeMutex.Unlock()
for i := 0; i < maxInProcessRequests+1; i++ {
for i := 0; i < maxConcurrentFinds+1; i++ {
for range requestChannels[i] {
}
}

fpn.queriesMadeMutex.Lock()
defer fpn.queriesMadeMutex.Unlock()
if fpn.queriesMade != maxInProcessRequests+1 {
if fpn.queriesMade != maxConcurrentFinds+1 {
t.Logf("Queries made: %d\n", fpn.queriesMade)
t.Fatal("Did not make all separate requests")
}
Expand Down
12 changes: 6 additions & 6 deletions bitswap/client/optios.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@ type clientConfig struct {
tracer tracer.Tracer

// ProviderQueryManager options.
pqmMaxProvidersPerFind int
pqmMaxInProcessRequests int
pqmMaxConcurrentFinds int
pqmMaxProvidersPerFind int
}

// Option defines the functional option type that can be used to configure
Expand Down Expand Up @@ -92,14 +92,14 @@ func WithoutDuplicatedBlockStats() Option {
}
}

func WithPQMMaxProvidersPerFind(n int) Option {
func WithPQMMaxConcurrentFinds(n int) Option {
return func(c *clientConfig) {
c.pqmMaxProvidersPerFind = n
c.pqmMaxConcurrentFinds = n
}
}

func WithPQMMaxInProcessRequests(n int) Option {
func WithPQMMaxProvidersPerFind(n int) Option {
return func(c *clientConfig) {
c.pqmMaxInProcessRequests = n
c.pqmMaxProvidersPerFind = n
}
}

0 comments on commit 7cdeb4b

Please sign in to comment.