diff --git a/CHANGELOG.md b/CHANGELOG.md index a213db637..b70004620 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -18,11 +18,16 @@ The following emojis are used to highlight certain changes: ### Changed +- `bitswap/server` minor memory use and performance improvements +- `bitswap` unify logger names to use uniform format bitswap/path/pkgname +- `gateway` now always returns meaningful cache-control headers for generated HTML listings of UnixFS directories + ### Removed ### Fixed - `boxo/gateway` now returns 404 Status Not Found instead of 500 when the requested data cannot be found, without a fallback on bitswap or similar restriction. +- `bitswap/client` fix memory leak in BlockPresenceManager due to unlimited map growth. ### Security @@ -31,7 +36,7 @@ The following emojis are used to highlight certain changes: ### Changed - `boxo/gateway` is now tested against [gateway-conformance v6](https://github.com/ipfs/gateway-conformance/releases/tag/v0.6.0) -- `bitswap/client` supports additional tracing +- `bitswap/client` supports additional tracing ### Removed @@ -41,6 +46,7 @@ The following emojis are used to highlight certain changes: - `routing/http`: the `FindPeer` now returns `routing.ErrNotFound` when no addresses are found - `routing/http`: the `FindProvidersAsync` no longer causes a goroutine buildup +- `bitswap`: wantlist overflow handling now cancels existing entries to make room for newer entries. This fix prevents the wantlist from filling up with CIDs that the server does not have. ## [v0.20.0] diff --git a/README.md b/README.md index bd824f512..ed8ac109f 100644 --- a/README.md +++ b/README.md @@ -85,7 +85,7 @@ Boxo includes high-quality components useful for interacting with IPFS protocols - Interacting with public and private IPFS networks - Working with content-addressed data -Boxo aims to provide a cohesive interface into these components. Note that not all of the underlying components necessarily reside in this respository. +Boxo aims to provide a cohesive interface into these components. Note that not all of the underlying components necessarily reside in this repository. ### Does Boxo == IPFS? diff --git a/bitswap/client/client.go b/bitswap/client/client.go index aa9ab78fa..0a5bdeb9e 100644 --- a/bitswap/client/client.go +++ b/bitswap/client/client.go @@ -38,7 +38,7 @@ import ( "go.opentelemetry.io/otel/trace" ) -var log = logging.Logger("bitswap-client") +var log = logging.Logger("bitswap/client") // Option defines the functional option type that can be used to configure // bitswap instances diff --git a/bitswap/client/internal/blockpresencemanager/blockpresencemanager.go b/bitswap/client/internal/blockpresencemanager/blockpresencemanager.go index 1b76acc5b..685981381 100644 --- a/bitswap/client/internal/blockpresencemanager/blockpresencemanager.go +++ b/bitswap/client/internal/blockpresencemanager/blockpresencemanager.go @@ -15,9 +15,7 @@ type BlockPresenceManager struct { } func New() *BlockPresenceManager { - return &BlockPresenceManager{ - presence: make(map[cid.Cid]map[peer.ID]bool), - } + return &BlockPresenceManager{} } // ReceiveFrom is called when a peer sends us information about which blocks @@ -26,6 +24,10 @@ func (bpm *BlockPresenceManager) ReceiveFrom(p peer.ID, haves []cid.Cid, dontHav bpm.Lock() defer bpm.Unlock() + if bpm.presence == nil { + bpm.presence = make(map[cid.Cid]map[peer.ID]bool) + } + for _, c := range haves { bpm.updateBlockPresence(p, c, true) } @@ -75,6 +77,10 @@ func (bpm *BlockPresenceManager) AllPeersDoNotHaveBlock(peers []peer.ID, ks []ci bpm.RLock() defer bpm.RUnlock() + if len(bpm.presence) == 0 { + return nil + } + var res []cid.Cid for _, c := range ks { if bpm.allDontHave(peers, c) { @@ -90,6 +96,9 @@ func (bpm *BlockPresenceManager) allDontHave(peers []peer.ID, c cid.Cid) bool { if !cok { return false } + if len(ps) == 0 { + return false + } // Check if we explicitly know that all the given peers do not have the cid for _, p := range peers { @@ -108,6 +117,25 @@ func (bpm *BlockPresenceManager) RemoveKeys(ks []cid.Cid) { for _, c := range ks { delete(bpm.presence, c) } + if len(bpm.presence) == 0 { + bpm.presence = nil + } +} + +// RemovePeer removes the given peer from every cid key in the presence map. +func (bpm *BlockPresenceManager) RemovePeer(p peer.ID) { + bpm.Lock() + defer bpm.Unlock() + + for c, pm := range bpm.presence { + delete(pm, p) + if len(pm) == 0 { + delete(bpm.presence, c) + } + } + if len(bpm.presence) == 0 { + bpm.presence = nil + } } // HasKey indicates whether the BlockPresenceManager is tracking the given key diff --git a/bitswap/client/internal/blockpresencemanager/blockpresencemanager_test.go b/bitswap/client/internal/blockpresencemanager/blockpresencemanager_test.go index b977c28ff..0a1ba7d80 100644 --- a/bitswap/client/internal/blockpresencemanager/blockpresencemanager_test.go +++ b/bitswap/client/internal/blockpresencemanager/blockpresencemanager_test.go @@ -93,6 +93,27 @@ func TestBlockPresenceManager(t *testing.T) { if bpm.PeerDoesNotHaveBlock(p, c1) { t.Fatal(expDoesNotHaveFalseMsg) } + + bpm.ReceiveFrom(p, []cid.Cid{c0}, []cid.Cid{c1}) + if !bpm.PeerHasBlock(p, c0) { + t.Fatal(expHasTrueMsg) + } + if !bpm.PeerDoesNotHaveBlock(p, c1) { + t.Fatal(expDoesNotHaveTrueMsg) + } + bpm.RemovePeer(p) + if bpm.PeerHasBlock(p, c0) { + t.Fatal(expHasFalseMsg) + } + if bpm.PeerDoesNotHaveBlock(p, c0) { + t.Fatal(expDoesNotHaveFalseMsg) + } + if bpm.PeerHasBlock(p, c1) { + t.Fatal(expHasFalseMsg) + } + if bpm.PeerDoesNotHaveBlock(p, c1) { + t.Fatal(expDoesNotHaveFalseMsg) + } } func TestAddRemoveMulti(t *testing.T) { diff --git a/bitswap/client/internal/getter/getter.go b/bitswap/client/internal/getter/getter.go index 822d319b7..c03b2aecc 100644 --- a/bitswap/client/internal/getter/getter.go +++ b/bitswap/client/internal/getter/getter.go @@ -13,7 +13,7 @@ import ( ipld "github.com/ipfs/go-ipld-format" ) -var log = logging.Logger("bitswap") +var log = logging.Logger("bitswap/client/getter") // GetBlocksFunc is any function that can take an array of CIDs and return a // channel of incoming blocks. diff --git a/bitswap/client/internal/messagequeue/messagequeue.go b/bitswap/client/internal/messagequeue/messagequeue.go index 4f90f239b..fac72f7cd 100644 --- a/bitswap/client/internal/messagequeue/messagequeue.go +++ b/bitswap/client/internal/messagequeue/messagequeue.go @@ -19,7 +19,7 @@ import ( ) var ( - log = logging.Logger("bitswap") + log = logging.Logger("bitswap/client/msgq") sflog = log.Desugar() ) diff --git a/bitswap/client/internal/messagequeue/messagequeue_test.go b/bitswap/client/internal/messagequeue/messagequeue_test.go index 4d361c5d5..e9b8f7c54 100644 --- a/bitswap/client/internal/messagequeue/messagequeue_test.go +++ b/bitswap/client/internal/messagequeue/messagequeue_test.go @@ -14,12 +14,13 @@ import ( bsmsg "github.com/ipfs/boxo/bitswap/message" pb "github.com/ipfs/boxo/bitswap/message/pb" bsnet "github.com/ipfs/boxo/bitswap/network" - "github.com/ipfs/boxo/internal/test" cid "github.com/ipfs/go-cid" peer "github.com/libp2p/go-libp2p/core/peer" "github.com/libp2p/go-libp2p/p2p/protocol/ping" ) +const collectTimeout = 200 * time.Millisecond + type fakeMessageNetwork struct { connectError error messageSenderError error @@ -172,7 +173,7 @@ func TestStartupAndShutdown(t *testing.T) { messageQueue.Startup() messageQueue.AddBroadcastWantHaves(bcstwh) - messages := collectMessages(ctx, t, messagesSent, 100*time.Millisecond) + messages := collectMessages(ctx, t, messagesSent, collectTimeout) if len(messages) != 1 { t.Fatal("wrong number of messages were sent for broadcast want-haves") } @@ -212,7 +213,7 @@ func TestSendingMessagesDeduped(t *testing.T) { messageQueue.Startup() messageQueue.AddWants(wantBlocks, wantHaves) messageQueue.AddWants(wantBlocks, wantHaves) - messages := collectMessages(ctx, t, messagesSent, 100*time.Millisecond) + messages := collectMessages(ctx, t, messagesSent, collectTimeout) if totalEntriesLength(messages) != len(wantHaves)+len(wantBlocks) { t.Fatal("Messages were not deduped") @@ -220,8 +221,6 @@ func TestSendingMessagesDeduped(t *testing.T) { } func TestSendingMessagesPartialDupe(t *testing.T) { - test.Flaky(t) - ctx := context.Background() messagesSent := make(chan []bsmsg.Entry) resetChan := make(chan struct{}, 1) @@ -235,7 +234,7 @@ func TestSendingMessagesPartialDupe(t *testing.T) { messageQueue.Startup() messageQueue.AddWants(wantBlocks[:8], wantHaves[:8]) messageQueue.AddWants(wantBlocks[3:], wantHaves[3:]) - messages := collectMessages(ctx, t, messagesSent, 20*time.Millisecond) + messages := collectMessages(ctx, t, messagesSent, 5*collectTimeout) if totalEntriesLength(messages) != len(wantHaves)+len(wantBlocks) { t.Fatal("messages were not correctly deduped") @@ -243,8 +242,6 @@ func TestSendingMessagesPartialDupe(t *testing.T) { } func TestSendingMessagesPriority(t *testing.T) { - test.Flaky(t) - ctx := context.Background() messagesSent := make(chan []bsmsg.Entry) resetChan := make(chan struct{}, 1) @@ -262,7 +259,7 @@ func TestSendingMessagesPriority(t *testing.T) { messageQueue.Startup() messageQueue.AddWants(wantBlocks1, wantHaves1) messageQueue.AddWants(wantBlocks2, wantHaves2) - messages := collectMessages(ctx, t, messagesSent, 20*time.Millisecond) + messages := collectMessages(ctx, t, messagesSent, 5*collectTimeout) if totalEntriesLength(messages) != len(wantHaves)+len(wantBlocks) { t.Fatal("wrong number of wants") @@ -327,7 +324,7 @@ func TestCancelOverridesPendingWants(t *testing.T) { messageQueue.Startup() messageQueue.AddWants(wantBlocks, wantHaves) messageQueue.AddCancels(cancels) - messages := collectMessages(ctx, t, messagesSent, 100*time.Millisecond) + messages := collectMessages(ctx, t, messagesSent, collectTimeout) if totalEntriesLength(messages) != len(wantHaves)+len(wantBlocks)-len(cancels) { t.Fatal("Wrong message count") @@ -351,7 +348,7 @@ func TestCancelOverridesPendingWants(t *testing.T) { // Cancel the remaining want-blocks and want-haves cancels = append(wantHaves, wantBlocks...) messageQueue.AddCancels(cancels) - messages = collectMessages(ctx, t, messagesSent, 100*time.Millisecond) + messages = collectMessages(ctx, t, messagesSent, collectTimeout) // The remaining 2 cancels should be sent to the network as they are for // wants that were sent to the network @@ -379,7 +376,7 @@ func TestWantOverridesPendingCancels(t *testing.T) { // Add 1 want-block and 2 want-haves messageQueue.AddWants(wantBlocks, wantHaves) - messages := collectMessages(ctx, t, messagesSent, 100*time.Millisecond) + messages := collectMessages(ctx, t, messagesSent, collectTimeout) if totalEntriesLength(messages) != len(wantBlocks)+len(wantHaves) { t.Fatal("Wrong message count", totalEntriesLength(messages)) } @@ -389,7 +386,7 @@ func TestWantOverridesPendingCancels(t *testing.T) { // Override one cancel with a want-block (before cancel is sent to network) messageQueue.AddWants(cids[:1], []cid.Cid{}) - messages = collectMessages(ctx, t, messagesSent, 100*time.Millisecond) + messages = collectMessages(ctx, t, messagesSent, collectTimeout) if totalEntriesLength(messages) != 3 { t.Fatal("Wrong message count", totalEntriesLength(messages)) } @@ -531,7 +528,7 @@ func TestSendingLargeMessages(t *testing.T) { messageQueue.Startup() messageQueue.AddWants(wantBlocks, []cid.Cid{}) - messages := collectMessages(ctx, t, messagesSent, 100*time.Millisecond) + messages := collectMessages(ctx, t, messagesSent, 5*collectTimeout) // want-block has size 44, so with maxMsgSize 44 * 3 (3 want-blocks), then if // we send 10 want-blocks we should expect 4 messages: @@ -563,7 +560,7 @@ func TestSendToPeerThatDoesntSupportHave(t *testing.T) { // Check broadcast want-haves bcwh := testutil.GenerateCids(10) messageQueue.AddBroadcastWantHaves(bcwh) - messages := collectMessages(ctx, t, messagesSent, 100*time.Millisecond) + messages := collectMessages(ctx, t, messagesSent, collectTimeout) if len(messages) != 1 { t.Fatal("wrong number of messages were sent", len(messages)) @@ -582,7 +579,7 @@ func TestSendToPeerThatDoesntSupportHave(t *testing.T) { wbs := testutil.GenerateCids(10) whs := testutil.GenerateCids(10) messageQueue.AddWants(wbs, whs) - messages = collectMessages(ctx, t, messagesSent, 100*time.Millisecond) + messages = collectMessages(ctx, t, messagesSent, collectTimeout) if len(messages) != 1 { t.Fatal("wrong number of messages were sent", len(messages)) @@ -612,7 +609,7 @@ func TestSendToPeerThatDoesntSupportHaveMonitorsTimeouts(t *testing.T) { wbs := testutil.GenerateCids(10) messageQueue.AddWants(wbs, nil) - collectMessages(ctx, t, messagesSent, 100*time.Millisecond) + collectMessages(ctx, t, messagesSent, collectTimeout) // Check want-blocks are added to DontHaveTimeoutMgr if dhtm.pendingCount() != len(wbs) { @@ -621,7 +618,7 @@ func TestSendToPeerThatDoesntSupportHaveMonitorsTimeouts(t *testing.T) { cancelCount := 2 messageQueue.AddCancels(wbs[:cancelCount]) - collectMessages(ctx, t, messagesSent, 100*time.Millisecond) + collectMessages(ctx, t, messagesSent, collectTimeout) // Check want-blocks are removed from DontHaveTimeoutMgr if dhtm.pendingCount() != len(wbs)-cancelCount { @@ -692,9 +689,9 @@ func TestResponseReceivedAppliesForFirstResponseOnly(t *testing.T) { cids := testutil.GenerateCids(2) - // Add some wants and wait 10ms + // Add some wants and wait messageQueue.AddWants(cids, nil) - collectMessages(ctx, t, messagesSent, 100*time.Millisecond) + collectMessages(ctx, t, messagesSent, collectTimeout) // Receive a response for the wants messageQueue.ResponseReceived(cids) diff --git a/bitswap/client/internal/peermanager/peermanager.go b/bitswap/client/internal/peermanager/peermanager.go index f26b8fbec..25cdd605f 100644 --- a/bitswap/client/internal/peermanager/peermanager.go +++ b/bitswap/client/internal/peermanager/peermanager.go @@ -11,7 +11,7 @@ import ( peer "github.com/libp2p/go-libp2p/core/peer" ) -var log = logging.Logger("bs:peermgr") +var log = logging.Logger("bitswap/client/peermgr") // PeerQueue provides a queue of messages to be sent for a single peer. type PeerQueue interface { diff --git a/bitswap/client/internal/providerquerymanager/providerquerymanager.go b/bitswap/client/internal/providerquerymanager/providerquerymanager.go index 9bba5211f..ea10a40e5 100644 --- a/bitswap/client/internal/providerquerymanager/providerquerymanager.go +++ b/bitswap/client/internal/providerquerymanager/providerquerymanager.go @@ -13,7 +13,7 @@ import ( "go.opentelemetry.io/otel/trace" ) -var log = logging.Logger("bitswap") +var log = logging.Logger("bitswap/client/provqrymgr") const ( maxProviders = 10 diff --git a/bitswap/client/internal/session/session.go b/bitswap/client/internal/session/session.go index b77a82283..6f99dec0e 100644 --- a/bitswap/client/internal/session/session.go +++ b/bitswap/client/internal/session/session.go @@ -20,7 +20,7 @@ import ( ) var ( - log = logging.Logger("bs:sess") + log = logging.Logger("bitswap/session") sflog = log.Desugar() ) diff --git a/bitswap/client/internal/session/sessionwantsender.go b/bitswap/client/internal/session/sessionwantsender.go index 390fdf29d..1beefeb94 100644 --- a/bitswap/client/internal/session/sessionwantsender.go +++ b/bitswap/client/internal/session/sessionwantsender.go @@ -455,6 +455,7 @@ func (sws *sessionWantSender) processUpdates(updates []update) []cid.Cid { go func() { for p := range prunePeers { // Peer doesn't have anything we want, so remove it + sws.bpm.RemovePeer(p) log.Infof("peer %s sent too many dont haves, removing from session %d", p, sws.ID()) sws.SignalAvailability(p, false) } diff --git a/bitswap/client/internal/sessionpeermanager/sessionpeermanager.go b/bitswap/client/internal/sessionpeermanager/sessionpeermanager.go index f15be86b4..1832e9c7f 100644 --- a/bitswap/client/internal/sessionpeermanager/sessionpeermanager.go +++ b/bitswap/client/internal/sessionpeermanager/sessionpeermanager.go @@ -9,7 +9,7 @@ import ( peer "github.com/libp2p/go-libp2p/core/peer" ) -var log = logging.Logger("bs:sprmgr") +var log = logging.Logger("bitswap/client/sesspeermgr") const ( // Connection Manager tag value for session peers. Indicates to connection diff --git a/bitswap/message/message.go b/bitswap/message/message.go index 6b9d787e7..a0a45970b 100644 --- a/bitswap/message/message.go +++ b/bitswap/message/message.go @@ -182,15 +182,9 @@ func (m *impl) Clone() BitSwapMessage { // Reset the values in the message back to defaults, so it can be reused func (m *impl) Reset(full bool) { m.full = full - for k := range m.wantlist { - delete(m.wantlist, k) - } - for k := range m.blocks { - delete(m.blocks, k) - } - for k := range m.blockPresences { - delete(m.blockPresences, k) - } + clear(m.wantlist) + clear(m.blocks) + clear(m.blockPresences) m.pendingBytes = 0 } @@ -253,25 +247,31 @@ func (m *impl) Empty() bool { } func (m *impl) Wantlist() []Entry { - out := make([]Entry, 0, len(m.wantlist)) + out := make([]Entry, len(m.wantlist)) + var i int for _, e := range m.wantlist { - out = append(out, *e) + out[i] = *e + i++ } return out } func (m *impl) Blocks() []blocks.Block { - bs := make([]blocks.Block, 0, len(m.blocks)) + bs := make([]blocks.Block, len(m.blocks)) + var i int for _, block := range m.blocks { - bs = append(bs, block) + bs[i] = block + i++ } return bs } func (m *impl) BlockPresences() []BlockPresence { - bps := make([]BlockPresence, 0, len(m.blockPresences)) + bps := make([]BlockPresence, len(m.blockPresences)) + var i int for c, t := range m.blockPresences { - bps = append(bps, BlockPresence{c, t}) + bps[i] = BlockPresence{c, t} + i++ } return bps } diff --git a/bitswap/network/interface.go b/bitswap/network/interface.go index 962bc2588..6ea0fc525 100644 --- a/bitswap/network/interface.go +++ b/bitswap/network/interface.go @@ -19,7 +19,7 @@ var ( ProtocolBitswapNoVers = internal.ProtocolBitswapNoVers // ProtocolBitswapOneZero is the prefix for the legacy bitswap protocol ProtocolBitswapOneZero = internal.ProtocolBitswapOneZero - // ProtocolBitswapOneOne is the the prefix for version 1.1.0 + // ProtocolBitswapOneOne is the prefix for version 1.1.0 ProtocolBitswapOneOne = internal.ProtocolBitswapOneOne // ProtocolBitswap is the current version of the bitswap protocol: 1.2.0 ProtocolBitswap = internal.ProtocolBitswap diff --git a/bitswap/network/internal/default.go b/bitswap/network/internal/default.go index 13f4936a8..ee5a8974e 100644 --- a/bitswap/network/internal/default.go +++ b/bitswap/network/internal/default.go @@ -9,7 +9,7 @@ var ( ProtocolBitswapNoVers protocol.ID = "/ipfs/bitswap" // ProtocolBitswapOneZero is the prefix for the legacy bitswap protocol ProtocolBitswapOneZero protocol.ID = "/ipfs/bitswap/1.0.0" - // ProtocolBitswapOneOne is the the prefix for version 1.1.0 + // ProtocolBitswapOneOne is the prefix for version 1.1.0 ProtocolBitswapOneOne protocol.ID = "/ipfs/bitswap/1.1.0" // ProtocolBitswap is the current version of the bitswap protocol: 1.2.0 ProtocolBitswap protocol.ID = "/ipfs/bitswap/1.2.0" diff --git a/bitswap/network/ipfs_impl.go b/bitswap/network/ipfs_impl.go index 422249952..ac9ab66a6 100644 --- a/bitswap/network/ipfs_impl.go +++ b/bitswap/network/ipfs_impl.go @@ -26,7 +26,7 @@ import ( "github.com/multiformats/go-multistream" ) -var log = logging.Logger("bitswap_network") +var log = logging.Logger("bitswap/network") var connectTimeout = time.Second * 5 diff --git a/bitswap/server/internal/decision/engine.go b/bitswap/server/internal/decision/engine.go index 234c1c510..a40345d8f 100644 --- a/bitswap/server/internal/decision/engine.go +++ b/bitswap/server/internal/decision/engine.go @@ -2,14 +2,15 @@ package decision import ( + "cmp" "context" + "errors" "fmt" - "math/bits" + "slices" "sync" "time" "github.com/google/uuid" - wl "github.com/ipfs/boxo/bitswap/client/wantlist" "github.com/ipfs/boxo/bitswap/internal/defaults" bsmsg "github.com/ipfs/boxo/bitswap/message" @@ -60,7 +61,7 @@ import ( // whatever it sees fit to produce desired outcomes (get wanted keys // quickly, maintain good relationships with peers, etc). -var log = logging.Logger("engine") +var log = logging.Logger("bitswap/server/decision") const ( // outboxChanBuffer must be 0 to prevent stale messages from being sent @@ -132,9 +133,11 @@ type PeerEntry struct { // PeerLedger is an external ledger dealing with peers and their want lists. type PeerLedger interface { // Wants informs the ledger that [peer.ID] wants [wl.Entry]. - Wants(p peer.ID, e wl.Entry) + // If peer ledger exceed internal limit, then the entry is not added + // and false is returned. + Wants(p peer.ID, e wl.Entry) bool - // CancelWant returns true if the [cid.Cid] is present in the wantlist of [peer.ID]. + // CancelWant returns true if the [cid.Cid] was removed from the wantlist of [peer.ID]. CancelWant(p peer.ID, k cid.Cid) bool // CancelWantWithType will not cancel WantBlock if we sent a HAVE message. @@ -315,8 +318,11 @@ func WithMaxOutstandingBytesPerPeer(count int) Option { } } -// WithMaxQueuedWantlistEntriesPerPeer limits how much individual entries each peer is allowed to send. -// If a peer send us more than this we will truncate newest entries. +// WithMaxQueuedWantlistEntriesPerPeer limits how many individual entries each +// peer is allowed to send. If a peer sends more than this, then the lowest +// priority entries are truncated to this limit. If there is insufficient space +// to enqueue new entries, then older existing wants with no associated blocks, +// and lower priority wants, are canceled to make room for the new wants. func WithMaxQueuedWantlistEntriesPerPeer(count uint) Option { return func(e *Engine) { e.maxQueuedWantlistEntriesPerPeer = count @@ -402,7 +408,6 @@ func newEngine( taskWorkerCount: defaults.BitswapEngineTaskWorkerCount, sendDontHaves: true, self: self, - peerLedger: NewDefaultPeerLedger(), pendingGauge: bmetrics.PendingEngineGauge(ctx), activeGauge: bmetrics.ActiveEngineGauge(ctx), targetMessageSize: defaultTargetMessageSize, @@ -416,6 +421,11 @@ func newEngine( opt(e) } + // If peerLedger was not set by option, then create a default instance. + if e.peerLedger == nil { + e.peerLedger = NewDefaultPeerLedger(e.maxQueuedWantlistEntriesPerPeer) + } + e.bsm = newBlockstoreManager(bs, e.bstoreWorkerCount, bmetrics.PendingBlocksGauge(ctx), bmetrics.ActiveBlocksGauge(ctx)) // default peer task queue options @@ -668,37 +678,20 @@ func (e *Engine) Peers() []peer.ID { // MessageReceived is called when a message is received from a remote peer. // For each item in the wantlist, add a want-have or want-block entry to the -// request queue (this is later popped off by the workerTasks) -func (e *Engine) MessageReceived(ctx context.Context, p peer.ID, m bsmsg.BitSwapMessage) (mustKillConnection bool) { - entries := m.Wantlist() - - if len(entries) > 0 { - log.Debugw("Bitswap engine <- msg", "local", e.self, "from", p, "entryCount", len(entries)) - for _, et := range entries { - if !et.Cancel { - if et.WantType == pb.Message_Wantlist_Have { - log.Debugw("Bitswap engine <- want-have", "local", e.self, "from", p, "cid", et.Cid) - } else { - log.Debugw("Bitswap engine <- want-block", "local", e.self, "from", p, "cid", et.Cid) - } - } - } - } - +// request queue (this is later popped off by the workerTasks). Returns true +// if the connection to the server must be closed. +func (e *Engine) MessageReceived(ctx context.Context, p peer.ID, m bsmsg.BitSwapMessage) bool { if m.Empty() { log.Infof("received empty message from %s", p) + return false } - newWorkExists := false - defer func() { - if newWorkExists { - e.signalNewWork() - } - }() - - // Dispatch entries - wants, cancels := e.splitWantsCancels(entries) - wants, denials := e.splitWantsDenials(p, wants) + wants, cancels, denials, err := e.splitWantsCancelsDenials(p, m) + if err != nil { + // This is a truely broken client, let's kill the connection. + log.Warnw(err.Error(), "local", e.self, "remote", p) + return true + } // Get block sizes wantKs := cid.NewSet() @@ -708,7 +701,7 @@ func (e *Engine) MessageReceived(ctx context.Context, p peer.ID, m bsmsg.BitSwap blockSizes, err := e.bsm.getBlockSizes(ctx, wantKs.Keys()) if err != nil { log.Info("aborting message processing", err) - return + return false } e.lock.Lock() @@ -717,56 +710,35 @@ func (e *Engine) MessageReceived(ctx context.Context, p peer.ID, m bsmsg.BitSwap e.peerLedger.ClearPeerWantlist(p) } - s := uint(e.peerLedger.WantlistSizeForPeer(p)) - if wouldBe := s + uint(len(wants)); wouldBe > e.maxQueuedWantlistEntriesPerPeer { - log.Debugw("wantlist overflow", "local", e.self, "remote", p, "would be", wouldBe) - // truncate wantlist to avoid overflow - available, o := bits.Sub(e.maxQueuedWantlistEntriesPerPeer, s, 0) - if o != 0 { - available = 0 + var overflow []bsmsg.Entry + if len(wants) != 0 { + filteredWants := wants[:0] // shift inplace + for _, entry := range wants { + if !e.peerLedger.Wants(p, entry.Entry) { + // Cannot add entry because it would exceed size limit. + overflow = append(overflow, entry) + continue + } + filteredWants = append(filteredWants, entry) } - wants = wants[:available] + // Clear truncated entries - early GC. + clear(wants[len(filteredWants):]) + wants = filteredWants } - filteredWants := wants[:0] // shift inplace - - for _, entry := range wants { - if entry.Cid.Prefix().MhType == mh.IDENTITY { - // This is a truely broken client, let's kill the connection. - e.lock.Unlock() - log.Warnw("peer wants an identity CID", "local", e.self, "remote", p) - return true - } - if e.maxCidSize != 0 && uint(entry.Cid.ByteLen()) > e.maxCidSize { - // Ignore requests about CIDs that big. - continue - } - - e.peerLedger.Wants(p, entry.Entry) - filteredWants = append(filteredWants, entry) - } - clear := wants[len(filteredWants):] - for i := range clear { - clear[i] = bsmsg.Entry{} // early GC + if len(overflow) != 0 { + log.Infow("handling wantlist overflow", "local", e.self, "from", p, "wantlistSize", len(wants), "overflowSize", len(overflow)) + wants = e.handleOverflow(ctx, p, overflow, wants) } - wants = filteredWants - for _, entry := range cancels { - if entry.Cid.Prefix().MhType == mh.IDENTITY { - // This is a truely broken client, let's kill the connection. - e.lock.Unlock() - log.Warnw("peer canceled an identity CID", "local", e.self, "remote", p) - return true - } - if e.maxCidSize != 0 && uint(entry.Cid.ByteLen()) > e.maxCidSize { - // Ignore requests about CIDs that big. - continue - } - log.Debugw("Bitswap engine <- cancel", "local", e.self, "from", p, "cid", entry.Cid) - if e.peerLedger.CancelWant(p, entry.Cid) { - e.peerRequestQueue.Remove(entry.Cid, p) + for _, entry := range cancels { + c := entry.Cid + log.Debugw("Bitswap engine <- cancel", "local", e.self, "from", p, "cid", c) + if e.peerLedger.CancelWant(p, c) { + e.peerRequestQueue.Remove(c, p) } } + e.lock.Unlock() var activeEntries []peertask.Task @@ -776,13 +748,6 @@ func (e *Engine) MessageReceived(ctx context.Context, p peer.ID, m bsmsg.BitSwap // Only add the task to the queue if the requester wants a DONT_HAVE if e.sendDontHaves && entry.SendDontHave { c := entry.Cid - - newWorkExists = true - isWantBlock := false - if entry.WantType == pb.Message_Wantlist_Block { - isWantBlock = true - } - activeEntries = append(activeEntries, peertask.Task{ Topic: c, Priority: int(entry.Priority), @@ -790,7 +755,7 @@ func (e *Engine) MessageReceived(ctx context.Context, p peer.ID, m bsmsg.BitSwap Data: &taskData{ BlockSize: 0, HaveBlock: false, - IsWantBlock: isWantBlock, + IsWantBlock: entry.WantType == pb.Message_Wantlist_Block, SendDontHave: entry.SendDontHave, }, }) @@ -806,82 +771,177 @@ func (e *Engine) MessageReceived(ctx context.Context, p peer.ID, m bsmsg.BitSwap // For each want-have / want-block for _, entry := range wants { c := entry.Cid - blockSize, found := blockSizes[entry.Cid] + blockSize, found := blockSizes[c] // If the block was not found if !found { - log.Debugw("Bitswap engine: block not found", "local", e.self, "from", p, "cid", entry.Cid, "sendDontHave", entry.SendDontHave) + log.Debugw("Bitswap engine: block not found", "local", e.self, "from", p, "cid", c, "sendDontHave", entry.SendDontHave) sendDontHave(entry) - } else { - // The block was found, add it to the queue - newWorkExists = true - - isWantBlock := e.sendAsBlock(entry.WantType, blockSize) - - log.Debugw("Bitswap engine: block found", "local", e.self, "from", p, "cid", entry.Cid, "isWantBlock", isWantBlock) - - // entrySize is the amount of space the entry takes up in the - // message we send to the recipient. If we're sending a block, the - // entrySize is the size of the block. Otherwise it's the size of - // a block presence entry. - entrySize := blockSize - if !isWantBlock { - entrySize = bsmsg.BlockPresenceSize(c) - } - activeEntries = append(activeEntries, peertask.Task{ - Topic: c, - Priority: int(entry.Priority), - Work: entrySize, - Data: &taskData{ - BlockSize: blockSize, - HaveBlock: true, - IsWantBlock: isWantBlock, - SendDontHave: entry.SendDontHave, - }, - }) + continue + } + // The block was found, add it to the queue + isWantBlock := e.sendAsBlock(entry.WantType, blockSize) + + log.Debugw("Bitswap engine: block found", "local", e.self, "from", p, "cid", c, "isWantBlock", isWantBlock) + + // entrySize is the amount of space the entry takes up in the + // message we send to the recipient. If we're sending a block, the + // entrySize is the size of the block. Otherwise it's the size of + // a block presence entry. + entrySize := blockSize + if !isWantBlock { + entrySize = bsmsg.BlockPresenceSize(c) } + activeEntries = append(activeEntries, peertask.Task{ + Topic: c, + Priority: int(entry.Priority), + Work: entrySize, + Data: &taskData{ + BlockSize: blockSize, + HaveBlock: true, + IsWantBlock: isWantBlock, + SendDontHave: entry.SendDontHave, + }, + }) } - // Push entries onto the request queue - if len(activeEntries) > 0 { + // Push entries onto the request queue and signal network that new work is ready. + if len(activeEntries) != 0 { e.peerRequestQueue.PushTasksTruncated(e.maxQueuedWantlistEntriesPerPeer, p, activeEntries...) e.updateMetrics() + e.signalNewWork() } return false } -// Split the want-have / want-block entries from the cancel entries -func (e *Engine) splitWantsCancels(es []bsmsg.Entry) ([]bsmsg.Entry, []bsmsg.Entry) { - wants := make([]bsmsg.Entry, 0, len(es)) - cancels := make([]bsmsg.Entry, 0, len(es)) - for _, et := range es { - if et.Cancel { - cancels = append(cancels, et) - } else { - wants = append(wants, et) +// handleOverflow processes incoming wants that could not be addded to the peer +// ledger without exceeding the peer want limit. These are handled by trying to +// make room by canceling existing wants for which there is no block. If this +// does not make sufficient room, then any lower priority wants that have +// blocks are canceled. +// +// Important: handleOverflwo must be called e.lock is locked. +func (e *Engine) handleOverflow(ctx context.Context, p peer.ID, overflow, wants []bsmsg.Entry) []bsmsg.Entry { + // Sort overflow from most to least important. + slices.SortFunc(overflow, func(a, b bsmsg.Entry) int { + return cmp.Compare(b.Entry.Priority, a.Entry.Priority) + }) + // Sort existing wants from least to most important, to try to replace + // lowest priority items first. + existingWants := e.peerLedger.WantlistForPeer(p) + slices.SortFunc(existingWants, func(a, b wl.Entry) int { + return cmp.Compare(b.Priority, a.Priority) + }) + + queuedWantKs := cid.NewSet() + for _, entry := range existingWants { + queuedWantKs.Add(entry.Cid) + } + queuedBlockSizes, err := e.bsm.getBlockSizes(ctx, queuedWantKs.Keys()) + if err != nil { + log.Info("aborting overflow processing", err) + return wants + } + + // Remove entries for blocks that are not present to make room for overflow. + var removed []int + for i, w := range existingWants { + if _, found := queuedBlockSizes[w.Cid]; !found { + // Cancel lowest priority dont-have. + if e.peerLedger.CancelWant(p, w.Cid) { + e.peerRequestQueue.Remove(w.Cid, p) + } + removed = append(removed, i) + // Pop hoghest priority overflow. + firstOver := overflow[0] + overflow = overflow[1:] + // Add highest priority overflow to wants. + e.peerLedger.Wants(p, firstOver.Entry) + wants = append(wants, firstOver) + if len(overflow) == 0 { + return wants + } } } - return wants, cancels + + // Replace existing entries, that are a lower priority, with overflow + // entries. + var replace int + for _, overflowEnt := range overflow { + // Do not compare with removed existingWants entry. + for len(removed) != 0 && replace == removed[0] { + replace++ + removed = removed[1:] + } + if overflowEnt.Entry.Priority < existingWants[replace].Priority { + // All overflow entries have too low of priority to replace any + // existing wants. + break + } + entCid := existingWants[replace].Cid + replace++ + if e.peerLedger.CancelWant(p, entCid) { + e.peerRequestQueue.Remove(entCid, p) + } + e.peerLedger.Wants(p, overflowEnt.Entry) + wants = append(wants, overflowEnt) + } + + return wants } -// Split the want-have / want-block entries from the block that will be denied access -func (e *Engine) splitWantsDenials(p peer.ID, allWants []bsmsg.Entry) ([]bsmsg.Entry, []bsmsg.Entry) { - if e.peerBlockRequestFilter == nil { - return allWants, nil +// Split the want-havek entries from the cancel and deny entries. +func (e *Engine) splitWantsCancelsDenials(p peer.ID, m bsmsg.BitSwapMessage) ([]bsmsg.Entry, []bsmsg.Entry, []bsmsg.Entry, error) { + entries := m.Wantlist() // creates copy; safe to modify + if len(entries) == 0 { + return nil, nil, nil, nil } - wants := make([]bsmsg.Entry, 0, len(allWants)) - denied := make([]bsmsg.Entry, 0, len(allWants)) + log.Debugw("Bitswap engine <- msg", "local", e.self, "from", p, "entryCount", len(entries)) - for _, et := range allWants { - if e.peerBlockRequestFilter(p, et.Cid) { - wants = append(wants, et) + wants := entries[:0] // shift in-place + var cancels, denials []bsmsg.Entry + + for _, et := range entries { + c := et.Cid + if e.maxCidSize != 0 && uint(c.ByteLen()) > e.maxCidSize { + // Ignore requests about CIDs that big. + continue + } + if c.Prefix().MhType == mh.IDENTITY { + return nil, nil, nil, errors.New("peer canceled an identity CID") + } + + if et.Cancel { + cancels = append(cancels, et) + continue + } + + if et.WantType == pb.Message_Wantlist_Have { + log.Debugw("Bitswap engine <- want-have", "local", e.self, "from", p, "cid", c) } else { - denied = append(denied, et) + log.Debugw("Bitswap engine <- want-block", "local", e.self, "from", p, "cid", c) + } + + if e.peerBlockRequestFilter != nil && !e.peerBlockRequestFilter(p, c) { + denials = append(denials, et) + continue + } + + // Do not take more wants that can be handled. + if len(wants) < int(e.maxQueuedWantlistEntriesPerPeer) { + wants = append(wants, et) } } - return wants, denied + if len(wants) == 0 { + wants = nil + } + + // Clear truncated entries. + clear(entries[len(wants):]) + + return wants, cancels, denials, nil } // ReceivedBlocks is called when new blocks are received from the network. diff --git a/bitswap/server/internal/decision/engine_test.go b/bitswap/server/internal/decision/engine_test.go index c25e3508d..b83342302 100644 --- a/bitswap/server/internal/decision/engine_test.go +++ b/bitswap/server/internal/decision/engine_test.go @@ -14,6 +14,7 @@ import ( "time" "github.com/benbjohnson/clock" + wl "github.com/ipfs/boxo/bitswap/client/wantlist" "github.com/ipfs/boxo/bitswap/internal/testutil" message "github.com/ipfs/boxo/bitswap/message" pb "github.com/ipfs/boxo/bitswap/message/pb" @@ -1733,3 +1734,235 @@ func TestKillConnectionForInlineCid(t *testing.T) { t.Fatal("connection was not killed when receiving inline in cancel") } } + +func TestWantlistBlocked(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + const limit = 32 + + bs := blockstore.NewBlockstore(dssync.MutexWrap(ds.NewMapDatastore())) + + // Generate a set of blocks that the server has. + haveCids := make([]cid.Cid, limit) + var blockNum int + for blockNum < limit { + block := blocks.NewBlock([]byte(fmt.Sprint(blockNum))) + if blockNum != 0 { // do not put first block in blockstore. + if err := bs.Put(context.Background(), block); err != nil { + t.Fatal(err) + } + } + haveCids[blockNum] = block.Cid() + blockNum++ + } + + fpt := &fakePeerTagger{} + e := newEngineForTesting(ctx, bs, fpt, "localhost", 0, WithScoreLedger(NewTestScoreLedger(shortTerm, nil, clock.New())), WithBlockstoreWorkerCount(4), WithMaxQueuedWantlistEntriesPerPeer(limit)) + e.StartWorkers(ctx, process.WithTeardown(func() error { return nil })) + warsaw := engineSet{ + Peer: peer.ID("warsaw"), + PeerTagger: fpt, + Blockstore: bs, + Engine: e, + } + riga := newTestEngine(ctx, "riga") + if warsaw.Peer == riga.Peer { + t.Fatal("Sanity Check: Peers have same Key!") + } + + m := message.New(false) + dontHaveCids := make([]cid.Cid, limit) + for i := 0; i < limit; i++ { + c := blocks.NewBlock([]byte(fmt.Sprint(blockNum))).Cid() + blockNum++ + m.AddEntry(c, 1, pb.Message_Wantlist_Block, true) + dontHaveCids[i] = c + } + warsaw.Engine.MessageReceived(ctx, riga.Peer, m) + wl := warsaw.Engine.WantlistForPeer(riga.Peer) + // Check that all the dontHave wants are on the wantlist. + for _, c := range dontHaveCids { + if !findCid(c, wl) { + t.Fatal("Expected all dontHaveCids to be on wantlist") + } + } + t.Log("All", len(wl), "dont-have CIDs are on wantlist") + + m = message.New(false) + for _, c := range haveCids { + m.AddEntry(c, 1, pb.Message_Wantlist_Block, true) + } + warsaw.Engine.MessageReceived(ctx, riga.Peer, m) + wl = warsaw.Engine.WantlistForPeer(riga.Peer) + // Check that all the dontHave wants are on the wantlist. + for _, c := range haveCids { + if !findCid(c, wl) { + t.Fatal("Missing expected want. Expected all haveCids to be on wantlist") + } + } + t.Log("All", len(wl), "new have CIDs are now on wantlist") + + m = message.New(false) + for i := 0; i < limit; i++ { + c := blocks.NewBlock([]byte(fmt.Sprint(blockNum))).Cid() + blockNum++ + m.AddEntry(c, 1, pb.Message_Wantlist_Block, true) + dontHaveCids[i] = c + } + warsaw.Engine.MessageReceived(ctx, riga.Peer, m) + // Check that all the new dontHave wants are not on the wantlist. + for _, c := range dontHaveCids { + if findCid(c, wl) { + t.Fatal("No new dontHaveCids should be on wantlist") + } + } + t.Log("All", len(wl), "new dont-have CIDs are not on wantlist") +} + +func TestWantlistOverflow(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + const limit = 32 + + bs := blockstore.NewBlockstore(dssync.MutexWrap(ds.NewMapDatastore())) + + origCids := make([]cid.Cid, limit) + var blockNum int + m := message.New(false) + for blockNum < limit { + block := blocks.NewBlock([]byte(fmt.Sprint(blockNum))) + if blockNum != 0 { // do not put first block in blockstore. + if err := bs.Put(context.Background(), block); err != nil { + t.Fatal(err) + } + } + m.AddEntry(block.Cid(), 1, pb.Message_Wantlist_Block, true) + origCids[blockNum] = block.Cid() + blockNum++ + } + + fpt := &fakePeerTagger{} + e := newEngineForTesting(ctx, bs, fpt, "localhost", 0, WithScoreLedger(NewTestScoreLedger(shortTerm, nil, clock.New())), WithBlockstoreWorkerCount(4), WithMaxQueuedWantlistEntriesPerPeer(limit)) + e.StartWorkers(ctx, process.WithTeardown(func() error { return nil })) + warsaw := engineSet{ + Peer: peer.ID("warsaw"), + PeerTagger: fpt, + Blockstore: bs, + Engine: e, + } + riga := newTestEngine(ctx, "riga") + if warsaw.Peer == riga.Peer { + t.Fatal("Sanity Check: Peers have same Key!") + } + + warsaw.Engine.MessageReceived(ctx, riga.Peer, m) + // Check that the wantlist is at the size limit. + wl := warsaw.Engine.WantlistForPeer(riga.Peer) + if len(wl) != limit { + t.Fatal("wantlist size", len(wl), "does not match limit", limit) + } + t.Log("Sent message with", limit, "medium-priority wants and", limit-1, "have blocks present") + + m = message.New(false) + lowPrioCids := make([]cid.Cid, 5) + for i := 0; i < cap(lowPrioCids); i++ { + c := blocks.NewBlock([]byte(fmt.Sprint(blockNum))).Cid() + blockNum++ + m.AddEntry(c, 0, pb.Message_Wantlist_Block, true) + lowPrioCids[i] = c + } + warsaw.Engine.MessageReceived(ctx, riga.Peer, m) + wl = warsaw.Engine.WantlistForPeer(riga.Peer) + if len(wl) != limit { + t.Fatal("wantlist size", len(wl), "does not match limit", limit) + } + // Check that one low priority entry is on the wantlist, since there is one + // existing entry without a blocks and none at a lower priority. + var count int + for _, c := range lowPrioCids { + if findCid(c, wl) { + count++ + } + } + if count != 1 { + t.Fatal("Expected 1 low priority entry on wantlist, found", count) + } + t.Log("Sent message with", len(lowPrioCids), "low-priority wants. One accepted as replacement for existig want without block.") + + m = message.New(false) + highPrioCids := make([]cid.Cid, 5) + for i := 0; i < cap(highPrioCids); i++ { + c := blocks.NewBlock([]byte(fmt.Sprint(blockNum))).Cid() + blockNum++ + m.AddEntry(c, 10, pb.Message_Wantlist_Block, true) + highPrioCids[i] = c + } + warsaw.Engine.MessageReceived(ctx, riga.Peer, m) + wl = warsaw.Engine.WantlistForPeer(riga.Peer) + if len(wl) != limit { + t.Fatal("wantlist size", len(wl), "does not match limit", limit) + } + // Check that all high priority entries are all on wantlist, since there + // were existing entries with lower priority. + for _, c := range highPrioCids { + if !findCid(c, wl) { + t.Fatal("expected high priority entry on wantlist") + } + } + t.Log("Sent message with", len(highPrioCids), "high-priority wants. All accepted replacing wants without block or low priority.") + + // These new wants should overflow and some of them should replace existing + // wants that do not have blocks (the high-priority weants from the + // previous message). + m = message.New(false) + blockCids := make([]cid.Cid, len(highPrioCids)+2) + for i := 0; i < cap(blockCids); i++ { + c := blocks.NewBlock([]byte(fmt.Sprint(blockNum))).Cid() + blockNum++ + m.AddEntry(c, 0, pb.Message_Wantlist_Block, true) + blockCids[i] = c + } + warsaw.Engine.MessageReceived(ctx, riga.Peer, m) + wl = warsaw.Engine.WantlistForPeer(riga.Peer) + if len(wl) != limit { + t.Fatal("wantlist size", len(wl), "does not match limit", limit) + } + + count = 0 + for _, c := range blockCids { + if findCid(c, wl) { + count++ + } + } + if count != len(highPrioCids) { + t.Fatal("expected", len(highPrioCids), "of the new blocks, found", count) + } + t.Log("Sent message with", len(blockCids), "low-priority wants.", count, "accepted replacing wants without blocks from previous message") + + // Send the original wants. Some should replace the existing wants that do + // not have blocks associated, and the rest should overwrite the existing + // ones. + m = message.New(false) + for _, c := range origCids { + m.AddEntry(c, 0, pb.Message_Wantlist_Block, true) + } + warsaw.Engine.MessageReceived(ctx, riga.Peer, m) + wl = warsaw.Engine.WantlistForPeer(riga.Peer) + for _, c := range origCids { + if !findCid(c, wl) { + t.Fatal("missing low-priority original wants to overwrite existing") + } + } + t.Log("Sent message with", len(origCids), "original wants at low priority. All accepted overwriting existing wants.") +} + +func findCid(c cid.Cid, wantList []wl.Entry) bool { + for i := range wantList { + if wantList[i].Cid == c { + return true + } + } + return false +} diff --git a/bitswap/server/internal/decision/peer_ledger.go b/bitswap/server/internal/decision/peer_ledger.go index b79db226d..227e50de1 100644 --- a/bitswap/server/internal/decision/peer_ledger.go +++ b/bitswap/server/internal/decision/peer_ledger.go @@ -12,20 +12,31 @@ type DefaultPeerLedger struct { // these two maps are inversions of each other peers map[peer.ID]map[cid.Cid]entry cids map[cid.Cid]map[peer.ID]entry + // value 0 mean no limit + maxEntriesPerPeer int } -func NewDefaultPeerLedger() *DefaultPeerLedger { +func NewDefaultPeerLedger(maxEntriesPerPeer uint) *DefaultPeerLedger { return &DefaultPeerLedger{ peers: make(map[peer.ID]map[cid.Cid]entry), cids: make(map[cid.Cid]map[peer.ID]entry), + + maxEntriesPerPeer: int(maxEntriesPerPeer), } } -func (l *DefaultPeerLedger) Wants(p peer.ID, e wl.Entry) { +// Wants adds an entry to the peer ledger. If adding the entry would make the +// peer ledger exceed the maxEntriesPerPeer limit, then the entry is not added +// and false is returned. +func (l *DefaultPeerLedger) Wants(p peer.ID, e wl.Entry) bool { cids, ok := l.peers[p] if !ok { cids = make(map[cid.Cid]entry) l.peers[p] = cids + } else if l.maxEntriesPerPeer != 0 && len(cids) == l.maxEntriesPerPeer { + if _, ok = cids[e.Cid]; !ok { + return false // cannot add to peer ledger + } } cids[e.Cid] = entry{e.Priority, e.WantType} @@ -35,6 +46,8 @@ func (l *DefaultPeerLedger) Wants(p peer.ID, e wl.Entry) { l.cids[e.Cid] = m } m[p] = entry{e.Priority, e.WantType} + + return true } func (l *DefaultPeerLedger) CancelWant(p peer.ID, k cid.Cid) bool { @@ -42,13 +55,14 @@ func (l *DefaultPeerLedger) CancelWant(p peer.ID, k cid.Cid) bool { if !ok { return false } + _, had := wants[k] delete(wants, k) if len(wants) == 0 { delete(l.peers, p) } l.removePeerFromCid(p, k) - return true + return had } func (l *DefaultPeerLedger) CancelWantWithType(p peer.ID, k cid.Cid, typ pb.Message_Wantlist_WantType) { diff --git a/bitswap/server/server.go b/bitswap/server/server.go index 1e723ddce..85651a5ef 100644 --- a/bitswap/server/server.go +++ b/bitswap/server/server.go @@ -29,7 +29,7 @@ import ( var provideKeysBufferSize = 2048 var ( - log = logging.Logger("bitswap-server") + log = logging.Logger("bitswap/server") sflog = log.Desugar() ) diff --git a/docs/CODEOWNERS b/docs/CODEOWNERS index 7781c410c..94fb077e5 100644 --- a/docs/CODEOWNERS +++ b/docs/CODEOWNERS @@ -3,7 +3,7 @@ # requested to review draft pull requests. # Deafult -* @ipfs/kubo-maintainers +* @ipfs/kubo-maintainers # HTTP Gateway -gateway/ @lidel @hacdias +gateway/ @lidel diff --git a/docs/tracing.md b/docs/tracing.md index c43c10aed..868b68d95 100644 --- a/docs/tracing.md +++ b/docs/tracing.md @@ -9,7 +9,7 @@ for the `OTEL_TRACES_EXPORTER` environment variables. Therefore, we provide some helper functions under [`boxo/tracing`](../tracing/) to support these. In this document, we document the quirks of our custom support for the `OTEL_TRACES_EXPORTER`, -as well as examples on how to use tracing, create traceable headers, and how +as well as examples of how to use tracing, create traceable headers, and how to use the Jaeger UI. The [Gateway examples](../examples/gateway/) fully support Tracing. - [Environment Variables](#environment-variables) diff --git a/examples/car-file-fetcher/README.md b/examples/car-file-fetcher/README.md index d44b9cb59..97894251d 100644 --- a/examples/car-file-fetcher/README.md +++ b/examples/car-file-fetcher/README.md @@ -2,7 +2,7 @@ This example shows how to download a UnixFS file or directory from a gateway that implements [application/vnd.ipld.car](https://www.iana.org/assignments/media-types/application/vnd.ipld.car) -responses of the [Trustles Gateway](https://specs.ipfs.tech/http-gateways/trustless-gateway/) +responses of the [Trustless Gateway](https://specs.ipfs.tech/http-gateways/trustless-gateway/) specification, in a trustless, verifiable manner. It relies on [IPIP-402](https://specs.ipfs.tech/ipips/ipip-0402/) to retrieve diff --git a/gateway/gateway_test.go b/gateway/gateway_test.go index c6fbd67ed..3c1ab3f72 100644 --- a/gateway/gateway_test.go +++ b/gateway/gateway_test.go @@ -130,9 +130,9 @@ func TestHeaders(t *testing.T) { path string cacheControl string }{ - {"/ipns/example.net/", "public, max-age=30"}, // As generated directory listing - {"/ipns/example.com/", "public, max-age=55"}, // As generated directory listing (different) - {"/ipns/unknown.com/", ""}, // As generated directory listing (unknown) + {"/ipns/example.net/", "public, max-age=30, stale-while-revalidate=2678400"}, // As generated directory listing + {"/ipns/example.com/", "public, max-age=55, stale-while-revalidate=2678400"}, // As generated directory listing (different) + {"/ipns/unknown.com/", ""}, // As generated directory listing (unknown TTL) {"/ipns/example.net/foo/", "public, max-age=30"}, // As index.html directory listing {"/ipns/example.net/foo/index.html", "public, max-age=30"}, // As deserialized UnixFS file {"/ipns/example.net/?format=raw", "public, max-age=30"}, // As Raw block diff --git a/gateway/handler_unixfs_dir.go b/gateway/handler_unixfs_dir.go index 7a49dcafc..6f9f856d1 100644 --- a/gateway/handler_unixfs_dir.go +++ b/gateway/handler_unixfs_dir.go @@ -136,9 +136,14 @@ func (i *handler) serveDirectory(ctx context.Context, w http.ResponseWriter, r * dirEtag := getDirListingEtag(resolvedPath.RootCid()) w.Header().Set("Etag", dirEtag) - // Add TTL if known. + // Set Cache-Control if rq.ttl > 0 { - w.Header().Set("Cache-Control", fmt.Sprintf("public, max-age=%d", int(rq.ttl.Seconds()))) + // Use known TTL from IPNS Record or DNSLink TXT Record + w.Header().Set("Cache-Control", fmt.Sprintf("public, max-age=%d, stale-while-revalidate=2678400", int(rq.ttl.Seconds()))) + } else if !rq.contentPath.Mutable() { + // Cache for 1 week, serve stale cache for up to a month + // (style of generated HTML may change, should not be cached forever) + w.Header().Set("Cache-Control", "public, max-age=604800, stale-while-revalidate=2678400") } if r.Method == http.MethodHead { diff --git a/out b/out new file mode 100644 index 000000000..e69de29bb