Skip to content

Commit

Permalink
Merge branch 'main' into gw-ipld-notfound
Browse files Browse the repository at this point in the history
  • Loading branch information
lidel authored Jul 30, 2024
2 parents 589f56f + 42c0c86 commit 28f4bc8
Show file tree
Hide file tree
Showing 27 changed files with 567 additions and 202 deletions.
8 changes: 7 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,16 @@ The following emojis are used to highlight certain changes:

### Changed

- `bitswap/server` minor memory use and performance improvements
- `bitswap` unify logger names to use uniform format bitswap/path/pkgname
- `gateway` now always returns meaningful cache-control headers for generated HTML listings of UnixFS directories

### Removed

### Fixed

- `boxo/gateway` now returns 404 Status Not Found instead of 500 when the requested data cannot be found, without a fallback on bitswap or similar restriction.
- `bitswap/client` fix memory leak in BlockPresenceManager due to unlimited map growth.

### Security

Expand All @@ -31,7 +36,7 @@ The following emojis are used to highlight certain changes:
### Changed

- `boxo/gateway` is now tested against [gateway-conformance v6](https://github.com/ipfs/gateway-conformance/releases/tag/v0.6.0)
- `bitswap/client` supports additional tracing
- `bitswap/client` supports additional tracing

### Removed

Expand All @@ -41,6 +46,7 @@ The following emojis are used to highlight certain changes:

- `routing/http`: the `FindPeer` now returns `routing.ErrNotFound` when no addresses are found
- `routing/http`: the `FindProvidersAsync` no longer causes a goroutine buildup
- `bitswap`: wantlist overflow handling now cancels existing entries to make room for newer entries. This fix prevents the wantlist from filling up with CIDs that the server does not have.

## [v0.20.0]

Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ Boxo includes high-quality components useful for interacting with IPFS protocols
- Interacting with public and private IPFS networks
- Working with content-addressed data

Boxo aims to provide a cohesive interface into these components. Note that not all of the underlying components necessarily reside in this respository.
Boxo aims to provide a cohesive interface into these components. Note that not all of the underlying components necessarily reside in this repository.

### Does Boxo == IPFS?

Expand Down
2 changes: 1 addition & 1 deletion bitswap/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ import (
"go.opentelemetry.io/otel/trace"
)

var log = logging.Logger("bitswap-client")
var log = logging.Logger("bitswap/client")

// Option defines the functional option type that can be used to configure
// bitswap instances
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,7 @@ type BlockPresenceManager struct {
}

func New() *BlockPresenceManager {
return &BlockPresenceManager{
presence: make(map[cid.Cid]map[peer.ID]bool),
}
return &BlockPresenceManager{}
}

// ReceiveFrom is called when a peer sends us information about which blocks
Expand All @@ -26,6 +24,10 @@ func (bpm *BlockPresenceManager) ReceiveFrom(p peer.ID, haves []cid.Cid, dontHav
bpm.Lock()
defer bpm.Unlock()

if bpm.presence == nil {
bpm.presence = make(map[cid.Cid]map[peer.ID]bool)
}

for _, c := range haves {
bpm.updateBlockPresence(p, c, true)
}
Expand Down Expand Up @@ -75,6 +77,10 @@ func (bpm *BlockPresenceManager) AllPeersDoNotHaveBlock(peers []peer.ID, ks []ci
bpm.RLock()
defer bpm.RUnlock()

if len(bpm.presence) == 0 {
return nil
}

var res []cid.Cid
for _, c := range ks {
if bpm.allDontHave(peers, c) {
Expand All @@ -90,6 +96,9 @@ func (bpm *BlockPresenceManager) allDontHave(peers []peer.ID, c cid.Cid) bool {
if !cok {
return false
}
if len(ps) == 0 {
return false
}

// Check if we explicitly know that all the given peers do not have the cid
for _, p := range peers {
Expand All @@ -108,6 +117,25 @@ func (bpm *BlockPresenceManager) RemoveKeys(ks []cid.Cid) {
for _, c := range ks {
delete(bpm.presence, c)
}
if len(bpm.presence) == 0 {
bpm.presence = nil
}
}

// RemovePeer removes the given peer from every cid key in the presence map.
func (bpm *BlockPresenceManager) RemovePeer(p peer.ID) {
bpm.Lock()
defer bpm.Unlock()

for c, pm := range bpm.presence {
delete(pm, p)
if len(pm) == 0 {
delete(bpm.presence, c)
}
}
if len(bpm.presence) == 0 {
bpm.presence = nil
}
}

// HasKey indicates whether the BlockPresenceManager is tracking the given key
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,27 @@ func TestBlockPresenceManager(t *testing.T) {
if bpm.PeerDoesNotHaveBlock(p, c1) {
t.Fatal(expDoesNotHaveFalseMsg)
}

bpm.ReceiveFrom(p, []cid.Cid{c0}, []cid.Cid{c1})
if !bpm.PeerHasBlock(p, c0) {
t.Fatal(expHasTrueMsg)
}
if !bpm.PeerDoesNotHaveBlock(p, c1) {
t.Fatal(expDoesNotHaveTrueMsg)
}
bpm.RemovePeer(p)
if bpm.PeerHasBlock(p, c0) {
t.Fatal(expHasFalseMsg)
}
if bpm.PeerDoesNotHaveBlock(p, c0) {
t.Fatal(expDoesNotHaveFalseMsg)
}
if bpm.PeerHasBlock(p, c1) {
t.Fatal(expHasFalseMsg)
}
if bpm.PeerDoesNotHaveBlock(p, c1) {
t.Fatal(expDoesNotHaveFalseMsg)
}
}

func TestAddRemoveMulti(t *testing.T) {
Expand Down
2 changes: 1 addition & 1 deletion bitswap/client/internal/getter/getter.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import (
ipld "github.com/ipfs/go-ipld-format"
)

var log = logging.Logger("bitswap")
var log = logging.Logger("bitswap/client/getter")

// GetBlocksFunc is any function that can take an array of CIDs and return a
// channel of incoming blocks.
Expand Down
2 changes: 1 addition & 1 deletion bitswap/client/internal/messagequeue/messagequeue.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import (
)

var (
log = logging.Logger("bitswap")
log = logging.Logger("bitswap/client/msgq")
sflog = log.Desugar()
)

Expand Down
37 changes: 17 additions & 20 deletions bitswap/client/internal/messagequeue/messagequeue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,13 @@ import (
bsmsg "github.com/ipfs/boxo/bitswap/message"
pb "github.com/ipfs/boxo/bitswap/message/pb"
bsnet "github.com/ipfs/boxo/bitswap/network"
"github.com/ipfs/boxo/internal/test"
cid "github.com/ipfs/go-cid"
peer "github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/p2p/protocol/ping"
)

const collectTimeout = 200 * time.Millisecond

type fakeMessageNetwork struct {
connectError error
messageSenderError error
Expand Down Expand Up @@ -172,7 +173,7 @@ func TestStartupAndShutdown(t *testing.T) {

messageQueue.Startup()
messageQueue.AddBroadcastWantHaves(bcstwh)
messages := collectMessages(ctx, t, messagesSent, 100*time.Millisecond)
messages := collectMessages(ctx, t, messagesSent, collectTimeout)
if len(messages) != 1 {
t.Fatal("wrong number of messages were sent for broadcast want-haves")
}
Expand Down Expand Up @@ -212,16 +213,14 @@ func TestSendingMessagesDeduped(t *testing.T) {
messageQueue.Startup()
messageQueue.AddWants(wantBlocks, wantHaves)
messageQueue.AddWants(wantBlocks, wantHaves)
messages := collectMessages(ctx, t, messagesSent, 100*time.Millisecond)
messages := collectMessages(ctx, t, messagesSent, collectTimeout)

if totalEntriesLength(messages) != len(wantHaves)+len(wantBlocks) {
t.Fatal("Messages were not deduped")
}
}

func TestSendingMessagesPartialDupe(t *testing.T) {
test.Flaky(t)

ctx := context.Background()
messagesSent := make(chan []bsmsg.Entry)
resetChan := make(chan struct{}, 1)
Expand All @@ -235,16 +234,14 @@ func TestSendingMessagesPartialDupe(t *testing.T) {
messageQueue.Startup()
messageQueue.AddWants(wantBlocks[:8], wantHaves[:8])
messageQueue.AddWants(wantBlocks[3:], wantHaves[3:])
messages := collectMessages(ctx, t, messagesSent, 20*time.Millisecond)
messages := collectMessages(ctx, t, messagesSent, 5*collectTimeout)

if totalEntriesLength(messages) != len(wantHaves)+len(wantBlocks) {
t.Fatal("messages were not correctly deduped")
}
}

func TestSendingMessagesPriority(t *testing.T) {
test.Flaky(t)

ctx := context.Background()
messagesSent := make(chan []bsmsg.Entry)
resetChan := make(chan struct{}, 1)
Expand All @@ -262,7 +259,7 @@ func TestSendingMessagesPriority(t *testing.T) {
messageQueue.Startup()
messageQueue.AddWants(wantBlocks1, wantHaves1)
messageQueue.AddWants(wantBlocks2, wantHaves2)
messages := collectMessages(ctx, t, messagesSent, 20*time.Millisecond)
messages := collectMessages(ctx, t, messagesSent, 5*collectTimeout)

if totalEntriesLength(messages) != len(wantHaves)+len(wantBlocks) {
t.Fatal("wrong number of wants")
Expand Down Expand Up @@ -327,7 +324,7 @@ func TestCancelOverridesPendingWants(t *testing.T) {
messageQueue.Startup()
messageQueue.AddWants(wantBlocks, wantHaves)
messageQueue.AddCancels(cancels)
messages := collectMessages(ctx, t, messagesSent, 100*time.Millisecond)
messages := collectMessages(ctx, t, messagesSent, collectTimeout)

if totalEntriesLength(messages) != len(wantHaves)+len(wantBlocks)-len(cancels) {
t.Fatal("Wrong message count")
Expand All @@ -351,7 +348,7 @@ func TestCancelOverridesPendingWants(t *testing.T) {
// Cancel the remaining want-blocks and want-haves
cancels = append(wantHaves, wantBlocks...)
messageQueue.AddCancels(cancels)
messages = collectMessages(ctx, t, messagesSent, 100*time.Millisecond)
messages = collectMessages(ctx, t, messagesSent, collectTimeout)

// The remaining 2 cancels should be sent to the network as they are for
// wants that were sent to the network
Expand Down Expand Up @@ -379,7 +376,7 @@ func TestWantOverridesPendingCancels(t *testing.T) {
// Add 1 want-block and 2 want-haves
messageQueue.AddWants(wantBlocks, wantHaves)

messages := collectMessages(ctx, t, messagesSent, 100*time.Millisecond)
messages := collectMessages(ctx, t, messagesSent, collectTimeout)
if totalEntriesLength(messages) != len(wantBlocks)+len(wantHaves) {
t.Fatal("Wrong message count", totalEntriesLength(messages))
}
Expand All @@ -389,7 +386,7 @@ func TestWantOverridesPendingCancels(t *testing.T) {
// Override one cancel with a want-block (before cancel is sent to network)
messageQueue.AddWants(cids[:1], []cid.Cid{})

messages = collectMessages(ctx, t, messagesSent, 100*time.Millisecond)
messages = collectMessages(ctx, t, messagesSent, collectTimeout)
if totalEntriesLength(messages) != 3 {
t.Fatal("Wrong message count", totalEntriesLength(messages))
}
Expand Down Expand Up @@ -531,7 +528,7 @@ func TestSendingLargeMessages(t *testing.T) {

messageQueue.Startup()
messageQueue.AddWants(wantBlocks, []cid.Cid{})
messages := collectMessages(ctx, t, messagesSent, 100*time.Millisecond)
messages := collectMessages(ctx, t, messagesSent, 5*collectTimeout)

// want-block has size 44, so with maxMsgSize 44 * 3 (3 want-blocks), then if
// we send 10 want-blocks we should expect 4 messages:
Expand Down Expand Up @@ -563,7 +560,7 @@ func TestSendToPeerThatDoesntSupportHave(t *testing.T) {
// Check broadcast want-haves
bcwh := testutil.GenerateCids(10)
messageQueue.AddBroadcastWantHaves(bcwh)
messages := collectMessages(ctx, t, messagesSent, 100*time.Millisecond)
messages := collectMessages(ctx, t, messagesSent, collectTimeout)

if len(messages) != 1 {
t.Fatal("wrong number of messages were sent", len(messages))
Expand All @@ -582,7 +579,7 @@ func TestSendToPeerThatDoesntSupportHave(t *testing.T) {
wbs := testutil.GenerateCids(10)
whs := testutil.GenerateCids(10)
messageQueue.AddWants(wbs, whs)
messages = collectMessages(ctx, t, messagesSent, 100*time.Millisecond)
messages = collectMessages(ctx, t, messagesSent, collectTimeout)

if len(messages) != 1 {
t.Fatal("wrong number of messages were sent", len(messages))
Expand Down Expand Up @@ -612,7 +609,7 @@ func TestSendToPeerThatDoesntSupportHaveMonitorsTimeouts(t *testing.T) {

wbs := testutil.GenerateCids(10)
messageQueue.AddWants(wbs, nil)
collectMessages(ctx, t, messagesSent, 100*time.Millisecond)
collectMessages(ctx, t, messagesSent, collectTimeout)

// Check want-blocks are added to DontHaveTimeoutMgr
if dhtm.pendingCount() != len(wbs) {
Expand All @@ -621,7 +618,7 @@ func TestSendToPeerThatDoesntSupportHaveMonitorsTimeouts(t *testing.T) {

cancelCount := 2
messageQueue.AddCancels(wbs[:cancelCount])
collectMessages(ctx, t, messagesSent, 100*time.Millisecond)
collectMessages(ctx, t, messagesSent, collectTimeout)

// Check want-blocks are removed from DontHaveTimeoutMgr
if dhtm.pendingCount() != len(wbs)-cancelCount {
Expand Down Expand Up @@ -692,9 +689,9 @@ func TestResponseReceivedAppliesForFirstResponseOnly(t *testing.T) {

cids := testutil.GenerateCids(2)

// Add some wants and wait 10ms
// Add some wants and wait
messageQueue.AddWants(cids, nil)
collectMessages(ctx, t, messagesSent, 100*time.Millisecond)
collectMessages(ctx, t, messagesSent, collectTimeout)

// Receive a response for the wants
messageQueue.ResponseReceived(cids)
Expand Down
2 changes: 1 addition & 1 deletion bitswap/client/internal/peermanager/peermanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import (
peer "github.com/libp2p/go-libp2p/core/peer"
)

var log = logging.Logger("bs:peermgr")
var log = logging.Logger("bitswap/client/peermgr")

// PeerQueue provides a queue of messages to be sent for a single peer.
type PeerQueue interface {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import (
"go.opentelemetry.io/otel/trace"
)

var log = logging.Logger("bitswap")
var log = logging.Logger("bitswap/client/provqrymgr")

const (
maxProviders = 10
Expand Down
2 changes: 1 addition & 1 deletion bitswap/client/internal/session/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import (
)

var (
log = logging.Logger("bs:sess")
log = logging.Logger("bitswap/session")
sflog = log.Desugar()
)

Expand Down
1 change: 1 addition & 0 deletions bitswap/client/internal/session/sessionwantsender.go
Original file line number Diff line number Diff line change
Expand Up @@ -455,6 +455,7 @@ func (sws *sessionWantSender) processUpdates(updates []update) []cid.Cid {
go func() {
for p := range prunePeers {
// Peer doesn't have anything we want, so remove it
sws.bpm.RemovePeer(p)
log.Infof("peer %s sent too many dont haves, removing from session %d", p, sws.ID())
sws.SignalAvailability(p, false)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import (
peer "github.com/libp2p/go-libp2p/core/peer"
)

var log = logging.Logger("bs:sprmgr")
var log = logging.Logger("bitswap/client/sesspeermgr")

const (
// Connection Manager tag value for session peers. Indicates to connection
Expand Down
Loading

0 comments on commit 28f4bc8

Please sign in to comment.