Skip to content

Commit 031df82

Browse files
authored
fix(pinner): restore indirect pin detection and add context cancellation (#1039)
* fix(pinner): add context cancellation checks for indirect pins - add context checks at start of each recursive pin traversal - extract traverseIndirectPins helper to eliminate duplicate code - add tests for context cancellation behavior this fixes a pre-existing issue where indirect pin traversal never checked for context cancellation, causing operations to continue even after cancellation was requested. especially important for large pin sets with many recursive pins. * fix: indirect pin detection regression Recursively pinned roots should not show as indirect. Objects can be both directly and indirectly pinned. Restores behavior from before PR #1035. * docs: clarify pin type precedence in indirect checks explain why recursive pins are excluded from indirect results while direct pins are not. this asymmetry is intentional and preserved for compatibility with established ipfs behavior.
1 parent a04432d commit 031df82

File tree

2 files changed

+130
-56
lines changed

2 files changed

+130
-56
lines changed

pinning/pinner/dspinner/pin.go

Lines changed: 75 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -646,35 +646,9 @@ func (p *pinner) CheckIfPinnedWithType(ctx context.Context, mode ipfspinner.Mode
646646

647647
// Check for indirect pins
648648
if toCheck.Len() > 0 {
649-
var walkErr error
650-
visited := cid.NewSet()
651-
err := p.cidRIndex.ForEach(ctx, "", func(key, value string) bool {
652-
var rk cid.Cid
653-
rk, walkErr = cid.Cast([]byte(key))
654-
if walkErr != nil {
655-
return false
656-
}
657-
walkErr = merkledag.Walk(ctx, merkledag.GetLinksWithDAG(p.dserv), rk, func(c cid.Cid) bool {
658-
if toCheck.Len() == 0 || !visited.Visit(c) {
659-
return false
660-
}
661-
if toCheck.Has(c) {
662-
pinned = append(pinned, ipfspinner.Pinned{Key: c, Mode: ipfspinner.Indirect, Via: rk})
663-
toCheck.Remove(c)
664-
}
665-
return true
666-
}, merkledag.Concurrent())
667-
if walkErr != nil {
668-
return false
669-
}
670-
return toCheck.Len() > 0
671-
})
672-
if err != nil {
649+
if err := p.traverseIndirectPins(ctx, toCheck, &pinned); err != nil {
673650
return nil, err
674651
}
675-
if walkErr != nil {
676-
return nil, walkErr
677-
}
678652
}
679653

680654
// Anything left in toCheck is not pinned
@@ -741,48 +715,93 @@ func (p *pinner) checkPinsInIndex(ctx context.Context, mode ipfspinner.Mode, inc
741715
return pinned, nil
742716
}
743717

718+
// traverseIndirectPins is a helper that traverses all recursive pins to find indirect pins.
719+
// It modifies the pinned slice and toCheck set in place.
720+
func (p *pinner) traverseIndirectPins(ctx context.Context, toCheck *cid.Set, pinned *[]ipfspinner.Pinned) error {
721+
var walkErr error
722+
visited := cid.NewSet()
723+
err := p.cidRIndex.ForEach(ctx, "", func(key, value string) bool {
724+
// Check for context cancellation at the start of each recursive pin
725+
select {
726+
case <-ctx.Done():
727+
walkErr = ctx.Err()
728+
return false
729+
default:
730+
}
731+
732+
var rk cid.Cid
733+
rk, walkErr = cid.Cast([]byte(key))
734+
if walkErr != nil {
735+
return false
736+
}
737+
walkErr = merkledag.Walk(ctx, merkledag.GetLinksWithDAG(p.dserv), rk, func(c cid.Cid) bool {
738+
if toCheck.Len() == 0 || !visited.Visit(c) {
739+
return false
740+
}
741+
if toCheck.Has(c) {
742+
*pinned = append(*pinned, ipfspinner.Pinned{Key: c, Mode: ipfspinner.Indirect, Via: rk})
743+
toCheck.Remove(c)
744+
}
745+
return true
746+
}, merkledag.Concurrent())
747+
if walkErr != nil {
748+
return false
749+
}
750+
return toCheck.Len() > 0
751+
})
752+
if err != nil {
753+
return err
754+
}
755+
if walkErr != nil {
756+
return walkErr
757+
}
758+
return nil
759+
}
760+
744761
// checkIndirectPins checks if the given cids are pinned indirectly
745762
func (p *pinner) checkIndirectPins(ctx context.Context, cids ...cid.Cid) ([]ipfspinner.Pinned, error) {
746763
pinned := make([]ipfspinner.Pinned, 0, len(cids))
747764
toCheck := cid.NewSet()
748765

749-
// Check all CIDs for indirect pins, regardless of their direct pin status
750-
// A CID can be both directly pinned AND indirectly pinned through a parent
766+
// Filter out CIDs that are recursively pinned at the root level.
767+
// A recursively pinned CID is not considered indirect because recursive pins
768+
// are comprehensive (include all children), making "recursive" take precedence
769+
// over "indirect".
770+
//
771+
// However, we do NOT filter out direct pins here. Direct pins only pin a
772+
// single block, not its children. Therefore, a CID can legitimately be both:
773+
// - Directly pinned (explicitly pinned as a single block)
774+
// - Indirectly pinned (referenced by another pinned object's DAG)
775+
// This is why the asymmetry between recursive and direct pins is intentional.
776+
//
777+
// NOTE: While this behavior may feel arbitrary, we preserve it for compatibility
778+
// as this is how 'ipfs pin ls' has behaved for nearly a decade. The test
779+
// t0081-repo-pinning.sh in Kubo explicitly expects a CID to be both direct
780+
// and indirect, guarding this established behavior.
751781
for _, c := range cids {
782+
cidKey := c.KeyString()
783+
784+
// Check if recursively pinned
785+
ids, err := p.cidRIndex.Search(ctx, cidKey)
786+
if err != nil {
787+
return nil, err
788+
}
789+
if len(ids) > 0 {
790+
// This CID is recursively pinned at root level, not indirect
791+
pinned = append(pinned, ipfspinner.Pinned{Key: c, Mode: ipfspinner.NotPinned})
792+
continue
793+
}
794+
795+
// Still check for indirect even if directly pinned
796+
// A CID can be both direct and indirect
752797
toCheck.Add(c)
753798
}
754799

755800
// Now check for indirect pins by traversing recursive pins
756801
if toCheck.Len() > 0 {
757-
var walkErr error
758-
visited := cid.NewSet()
759-
err := p.cidRIndex.ForEach(ctx, "", func(key, value string) bool {
760-
var rk cid.Cid
761-
rk, walkErr = cid.Cast([]byte(key))
762-
if walkErr != nil {
763-
return false
764-
}
765-
walkErr = merkledag.Walk(ctx, merkledag.GetLinksWithDAG(p.dserv), rk, func(c cid.Cid) bool {
766-
if toCheck.Len() == 0 || !visited.Visit(c) {
767-
return false
768-
}
769-
if toCheck.Has(c) {
770-
pinned = append(pinned, ipfspinner.Pinned{Key: c, Mode: ipfspinner.Indirect, Via: rk})
771-
toCheck.Remove(c)
772-
}
773-
return true
774-
}, merkledag.Concurrent())
775-
if walkErr != nil {
776-
return false
777-
}
778-
return toCheck.Len() > 0
779-
})
780-
if err != nil {
802+
if err := p.traverseIndirectPins(ctx, toCheck, &pinned); err != nil {
781803
return nil, err
782804
}
783-
if walkErr != nil {
784-
return nil, walkErr
785-
}
786805
}
787806

788807
// Anything left in toCheck is not pinned

pinning/pinner/dspinner/pin_withtype_test.go

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,9 @@ package dspinner
22

33
import (
44
"context"
5+
"fmt"
56
"testing"
7+
"time"
68

79
bs "github.com/ipfs/boxo/blockservice"
810
blockstore "github.com/ipfs/boxo/blockstore"
@@ -155,4 +157,57 @@ func TestCheckIfPinnedWithType(t *testing.T) {
155157

156158
require.Equal(t, pinned1, pinned2)
157159
})
160+
161+
t.Run("Context cancellation during indirect check with many pins", func(t *testing.T) {
162+
// Create many recursive pins to ensure we can hit the cancellation
163+
nodes := make([]cid.Cid, 50)
164+
for i := 0; i < 50; i++ {
165+
node := mdag.NodeWithData([]byte(fmt.Sprintf("recursive node %d", i)))
166+
err = dserv.Add(ctx, node)
167+
require.NoError(t, err)
168+
nodes[i] = node.Cid()
169+
170+
p.PinWithMode(ctx, nodes[i], ipfspin.Recursive, fmt.Sprintf("recursive-%d", i))
171+
}
172+
err = p.Flush(ctx)
173+
require.NoError(t, err)
174+
175+
// Create a context that we can cancel
176+
cancelCtx, cancel := context.WithCancel(ctx)
177+
178+
// Start checking in a goroutine
179+
done := make(chan struct{})
180+
var checkErr error
181+
go func() {
182+
defer close(done)
183+
// Try to check for an indirect pin with many recursive pins to traverse
184+
_, checkErr = p.CheckIfPinnedWithType(cancelCtx, ipfspin.Indirect, withoutNames, ck)
185+
}()
186+
187+
// Cancel the context quickly
188+
time.Sleep(1 * time.Millisecond)
189+
cancel()
190+
191+
// Wait for the check to complete
192+
<-done
193+
194+
// Should get a context cancellation error (or nil if it completed too fast)
195+
// The important thing is that it doesn't hang
196+
if checkErr != nil {
197+
require.Equal(t, context.Canceled, checkErr)
198+
}
199+
})
200+
201+
t.Run("Context cancellation in checkIndirectPins", func(t *testing.T) {
202+
// Create a context that we can cancel immediately
203+
cancelCtx, cancel := context.WithCancel(ctx)
204+
cancel() // Cancel immediately
205+
206+
// Try to check for indirect pins with cancelled context
207+
_, err = p.CheckIfPinnedWithType(cancelCtx, ipfspin.Indirect, withoutNames, ck)
208+
209+
// Should get a context cancellation error
210+
require.Error(t, err)
211+
require.Equal(t, context.Canceled, err)
212+
})
158213
}

0 commit comments

Comments
 (0)