Skip to content

Commit

Permalink
feat(routing/providerquerymanager): allow for configurable WithMaxInP…
Browse files Browse the repository at this point in the history
…rocessRequests option
  • Loading branch information
aschmahmann committed Jul 26, 2024
1 parent 5a984ca commit e739a0c
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 9 deletions.
19 changes: 15 additions & 4 deletions routing/providerquerymanager/providerquerymanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,9 @@ import (
var log = logging.Logger("routing/provqrymgr")

const (
maxProviders = 10
maxInProcessRequests = 6
defaultTimeout = 10 * time.Second
maxProviders = 10
defaultMaxInProcessRequests = 6
defaultTimeout = 10 * time.Second
)

type inProgressRequestStatus struct {
Expand Down Expand Up @@ -85,6 +85,8 @@ type ProviderQueryManager struct {
findProviderTimeout time.Duration
timeoutMutex sync.RWMutex

maxInProcessRequests int

// do not touch outside the run loop
inProgressRequestStatuses map[cid.Cid]*inProgressRequestStatus
}
Expand All @@ -98,6 +100,14 @@ func WithMaxTimeout(timeout time.Duration) Option {
}
}

// WithMaxInProcessRequests is the maximum number of requests that can be processed in parallel
func WithMaxInProcessRequests(count int) Option {
return func(mgr *ProviderQueryManager) error {
mgr.maxInProcessRequests = count
return nil
}
}

// New initializes a new ProviderQueryManager for a given context and a given
// network provider.
func New(ctx context.Context, network ProviderQueryNetwork, opts ...Option) (*ProviderQueryManager, error) {
Expand All @@ -109,6 +119,7 @@ func New(ctx context.Context, network ProviderQueryNetwork, opts ...Option) (*Pr
incomingFindProviderRequests: make(chan *findProviderRequest),
inProgressRequestStatuses: make(map[cid.Cid]*inProgressRequestStatus),
findProviderTimeout: defaultTimeout,
maxInProcessRequests: defaultMaxInProcessRequests,
}

for _, o := range opts {
Expand Down Expand Up @@ -351,7 +362,7 @@ func (pqm *ProviderQueryManager) run() {
defer pqm.cleanupInProcessRequests()

go pqm.providerRequestBufferWorker()
for i := 0; i < maxInProcessRequests; i++ {
for i := 0; i < pqm.maxInProcessRequests; i++ {
go pqm.findProviderWorker()
}

Expand Down
10 changes: 5 additions & 5 deletions routing/providerquerymanager/providerquerymanager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -266,28 +266,28 @@ func TestRateLimitingRequests(t *testing.T) {
providerQueryManager := mustNotErr(New(ctx, fpn))
providerQueryManager.Startup()

keys := generateCids(defaultMaxInProcessRequests + 1)
keys := generateCids(providerQueryManager.maxInProcessRequests + 1)
sessionCtx, cancel := context.WithTimeout(ctx, 5*time.Second)
defer cancel()
var requestChannels []<-chan peer.ID
for i := 0; i < defaultMaxInProcessRequests+1; i++ {
for i := 0; i < providerQueryManager.maxInProcessRequests+1; i++ {
requestChannels = append(requestChannels, providerQueryManager.FindProvidersAsync(sessionCtx, keys[i]))
}
time.Sleep(20 * time.Millisecond)
fpn.queriesMadeMutex.Lock()
if fpn.liveQueries != defaultMaxInProcessRequests {
if fpn.liveQueries != providerQueryManager.maxInProcessRequests {
t.Logf("Queries made: %d\n", fpn.liveQueries)
t.Fatal("Did not limit parallel requests to rate limit")
}
fpn.queriesMadeMutex.Unlock()
for i := 0; i < defaultMaxInProcessRequests+1; i++ {
for i := 0; i < providerQueryManager.maxInProcessRequests+1; i++ {
for range requestChannels[i] {
}
}

fpn.queriesMadeMutex.Lock()
defer fpn.queriesMadeMutex.Unlock()
if fpn.queriesMade != defaultMaxInProcessRequests+1 {
if fpn.queriesMade != providerQueryManager.maxInProcessRequests+1 {
t.Logf("Queries made: %d\n", fpn.queriesMade)
t.Fatal("Did not make all separate requests")
}
Expand Down

0 comments on commit e739a0c

Please sign in to comment.