Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(bitswap/client/providerquerymanager): don't end trace span until … #725

Merged
merged 2 commits into from
Nov 21, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,6 @@

var span trace.Span
sessionCtx, span = internal.StartSpan(sessionCtx, "ProviderQueryManager.FindProvidersAsync", trace.WithAttributes(attribute.Stringer("cid", k)))
defer span.End()

select {
case pqm.providerQueryMessages <- &newProvideQueryMessage{
Expand All @@ -137,6 +136,7 @@
case <-pqm.ctx.Done():
ch := make(chan peer.ID)
close(ch)
span.End()
return ch
case <-sessionCtx.Done():
ch := make(chan peer.ID)
Expand All @@ -152,14 +152,15 @@
case <-pqm.ctx.Done():
ch := make(chan peer.ID)
close(ch)
span.End()

Check warning on line 155 in bitswap/client/internal/providerquerymanager/providerquerymanager.go

View check run for this annotation

Codecov / codecov/patch

bitswap/client/internal/providerquerymanager/providerquerymanager.go#L155

Added line #L155 was not covered by tests
return ch
case receivedInProgressRequest = <-inProgressRequestChan:
}

return pqm.receiveProviders(sessionCtx, k, receivedInProgressRequest)
return pqm.receiveProviders(sessionCtx, k, receivedInProgressRequest, func() { span.End() })
}

func (pqm *ProviderQueryManager) receiveProviders(sessionCtx context.Context, k cid.Cid, receivedInProgressRequest inProgressRequest) <-chan peer.ID {
func (pqm *ProviderQueryManager) receiveProviders(sessionCtx context.Context, k cid.Cid, receivedInProgressRequest inProgressRequest, onCloseFn func()) <-chan peer.ID {
// maintains an unbuffered queue for incoming providers for given request for a given session
// essentially, as a provider comes in, for a given CID, we want to immediately broadcast to all
// sessions that queried that CID, without worrying about whether the client code is actually
Expand All @@ -171,6 +172,7 @@

go func() {
defer close(returnedProviders)
defer onCloseFn()
outgoingProviders := func() chan<- peer.ID {
if len(receivedProviders) == 0 {
return nil
Expand Down