Skip to content

Commit 3ec3cf5

Browse files
Merge branch 'master' into provider-stats
2 parents 06ec289 + 861573b commit 3ec3cf5

File tree

3 files changed

+57
-85
lines changed

3 files changed

+57
-85
lines changed

dht.go

Lines changed: 25 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -746,39 +746,39 @@ func (dht *IpfsDHT) FindLocal(ctx context.Context, id peer.ID) peer.AddrInfo {
746746
return peer.AddrInfo{}
747747
}
748748

749-
// nearestPeersToQuery returns the routing tables closest peers.
750-
func (dht *IpfsDHT) nearestPeersToQuery(pmes *pb.Message, count int) []peer.ID {
751-
closer := dht.routingTable.NearestPeers(kb.ConvertKey(string(pmes.GetKey())), count)
752-
return closer
753-
}
754-
755-
// betterPeersToQuery returns nearestPeersToQuery with some additional filtering
756-
func (dht *IpfsDHT) betterPeersToQuery(pmes *pb.Message, from peer.ID, count int) []peer.ID {
757-
closer := dht.nearestPeersToQuery(pmes, count)
758-
759-
// no node? nil
760-
if closer == nil {
749+
// closestPeersToQuery returns the closest peers to the target key from the
750+
// local routing table, filtering out self and the requester's peer ID.
751+
//
752+
// Per the IPFS Kademlia DHT spec, servers SHOULD NOT include themselves or the
753+
// requester in FIND_NODE responses (except for FIND_PEER when the target is
754+
// self or the requester, which is handled separately in handleFindPeer).
755+
func (dht *IpfsDHT) closestPeersToQuery(pmes *pb.Message, from peer.ID, count int) []peer.ID {
756+
// Get count+1 closest peers to target key, so that we can filter out 'from'
757+
// (and potentially 'self' if it somehow appears) and still return 'count' peers.
758+
closestPeers := dht.routingTable.NearestPeers(kb.ConvertKey(string(pmes.GetKey())), count+1)
759+
760+
if len(closestPeers) == 0 {
761761
logger.Infow("no closer peers to send", from)
762762
return nil
763763
}
764764

765-
filtered := make([]peer.ID, 0, len(closer))
766-
for _, clp := range closer {
767-
768-
// == to self? thats bad
769-
if clp == dht.self {
770-
logger.Error("BUG betterPeersToQuery: attempted to return self! this shouldn't happen...")
771-
return nil
765+
filtered := make([]peer.ID, 0, min(len(closestPeers), count))
766+
for _, p := range closestPeers {
767+
// Per spec: don't include self in responses. This should never happen since
768+
// self should not be in the routing table, but check defensively.
769+
if p == dht.self {
770+
logger.Debugw("self found in routing table, skipping", "key", string(pmes.GetKey()))
771+
continue
772772
}
773-
// Dont send a peer back themselves
774-
if clp == from {
773+
// Per spec: don't include requester in responses (exception handled in handleFindPeer).
774+
if p == from {
775775
continue
776776
}
777-
778-
filtered = append(filtered, clp)
777+
filtered = append(filtered, p)
778+
if len(filtered) >= count {
779+
break
780+
}
779781
}
780-
781-
// ok seems like closer nodes
782782
return filtered
783783
}
784784

handlers.go

Lines changed: 22 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ import (
77
"time"
88

99
"github.com/libp2p/go-libp2p/core/peer"
10-
pstore "github.com/libp2p/go-libp2p/p2p/host/peerstore"
10+
"github.com/libp2p/go-libp2p/core/peerstore"
1111

1212
ds "github.com/ipfs/go-datastore"
1313
"github.com/libp2p/go-libp2p-kad-dht/internal"
@@ -66,12 +66,11 @@ func (dht *IpfsDHT) handleGetValue(ctx context.Context, p peer.ID, pmes *pb.Mess
6666
resp.Record = rec
6767

6868
// Find closest peer on given cluster to desired key and reply with that info
69-
closer := dht.betterPeersToQuery(pmes, p, dht.bucketSize)
70-
if len(closer) > 0 {
71-
// TODO: pstore.PeerInfos should move to core (=> peerstore.AddrInfos).
72-
closerinfos := pstore.PeerInfos(dht.peerstore, closer)
73-
for _, pi := range closerinfos {
74-
logger.Debugf("handleGetValue returning closer peer: '%s'", pi.ID)
69+
closestPeers := dht.closestPeersToQuery(pmes, p, dht.bucketSize)
70+
if len(closestPeers) > 0 {
71+
closestInfos := peerstore.AddrInfos(dht.peerstore, closestPeers)
72+
for _, pi := range closestInfos {
73+
logger.Debugf("handleGetValue returning closest peer: '%s'", pi.ID)
7574
if len(pi.Addrs) < 1 {
7675
logger.Warnw("no addresses on peer being sent",
7776
"local", dht.self,
@@ -81,7 +80,7 @@ func (dht *IpfsDHT) handleGetValue(ctx context.Context, p peer.ID, pmes *pb.Mess
8180
}
8281
}
8382

84-
resp.CloserPeers = pb.PeerInfosToPBPeers(dht.host.Network(), closerinfos)
83+
resp.CloserPeers = pb.PeerInfosToPBPeers(dht.host.Network(), closestInfos)
8584
}
8685

8786
return resp, nil
@@ -252,42 +251,26 @@ func (dht *IpfsDHT) handlePing(_ context.Context, p peer.ID, pmes *pb.Message) (
252251

253252
func (dht *IpfsDHT) handleFindPeer(ctx context.Context, from peer.ID, pmes *pb.Message) (_ *pb.Message, _err error) {
254253
resp := pb.NewMessage(pmes.GetType(), nil, pmes.GetClusterLevel())
255-
var closest []peer.ID
256254

257255
if len(pmes.GetKey()) == 0 {
258256
return nil, errors.New("handleFindPeer with empty key")
259257
}
260258

261-
// if looking for self... special case where we send it on CloserPeers.
262259
targetPid := peer.ID(pmes.GetKey())
263-
closest = dht.betterPeersToQuery(pmes, from, dht.bucketSize)
264-
265-
// Never tell a peer about itself.
266-
if targetPid != from {
267-
// Add the target peer to the set of closest peers if
268-
// not already present in our routing table.
269-
//
270-
// Later, when we lookup known addresses for all peers
271-
// in this set, we'll prune this peer if we don't
272-
// _actually_ know where it is.
273-
found := false
274-
for _, p := range closest {
275-
if targetPid == p {
276-
found = true
277-
break
278-
}
279-
}
280-
if !found {
281-
closest = append(closest, targetPid)
282-
}
283-
}
260+
closest := dht.closestPeersToQuery(pmes, from, dht.bucketSize)
284261

285-
if closest == nil {
286-
return resp, nil
262+
// Prepend targetPid to the front of the list if not already present.
263+
// targetPid is always the closest key to itself.
264+
//
265+
// Per IPFS Kademlia DHT spec: FIND_PEER has a special exception where the
266+
// target peer MUST be included in the response (if present in peerstore),
267+
// even if it is self, the requester, or not a DHT server. This allows peers
268+
// to discover multiaddresses for any peer, not just DHT servers.
269+
if len(closest) == 0 || closest[0] != targetPid {
270+
closest = append([]peer.ID{targetPid}, closest...)
287271
}
288272

289-
// TODO: pstore.PeerInfos should move to core (=> peerstore.AddrInfos).
290-
closestinfos := pstore.PeerInfos(dht.peerstore, closest)
273+
closestinfos := peerstore.AddrInfos(dht.peerstore, closest)
291274
// possibly an over-allocation but this array is temporary anyways.
292275
withAddresses := make([]peer.AddrInfo, 0, len(closestinfos))
293276
for _, pi := range closestinfos {
@@ -326,11 +309,10 @@ func (dht *IpfsDHT) handleGetProviders(ctx context.Context, p peer.ID, pmes *pb.
326309

327310
resp.ProviderPeers = pb.PeerInfosToPBPeers(dht.host.Network(), filtered)
328311

329-
// Also send closer peers.
330-
closer := dht.betterPeersToQuery(pmes, p, dht.bucketSize)
331-
if closer != nil {
332-
// TODO: pstore.PeerInfos should move to core (=> peerstore.AddrInfos).
333-
infos := pstore.PeerInfos(dht.peerstore, closer)
312+
// Also send closest dht servers we know about.
313+
closestPeers := dht.closestPeersToQuery(pmes, p, dht.bucketSize)
314+
if closestPeers != nil {
315+
infos := peerstore.AddrInfos(dht.peerstore, closestPeers)
334316
resp.CloserPeers = pb.PeerInfosToPBPeers(dht.host.Network(), infos)
335317
}
336318

provider/provider.go

Lines changed: 10 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -573,6 +573,7 @@ func (s *SweepingProvider) getAvgPrefixLenNoLock() (int, error) {
573573
// (typically 1 or 2), because exploring the keyspace would add too much
574574
// overhead for a small number of keys.
575575
func (s *SweepingProvider) vanillaProvide(k mh.Multihash, reprovide bool) (bitstr.Key, error) {
576+
keys := []mh.Multihash{k}
576577
// Add provider record to local provider store.
577578
s.addLocalRecord(k)
578579
// Get peers to which the record will be allocated.
@@ -584,7 +585,7 @@ func (s *SweepingProvider) vanillaProvide(k mh.Multihash, reprovide bool) (bitst
584585
addrInfo := peer.AddrInfo{ID: s.peerid, Addrs: s.getSelfAddrs()}
585586
keysAllocations := make(map[peer.ID][]mh.Multihash)
586587
for _, p := range peers {
587-
keysAllocations[p] = []mh.Multihash{k}
588+
keysAllocations[p] = keys
588589
}
589590
reachablePeers, err := s.sendProviderRecords(keysAllocations, addrInfo, 1)
590591

@@ -598,6 +599,10 @@ func (s *SweepingProvider) vanillaProvide(k mh.Multihash, reprovide bool) (bitst
598599
}
599600
s.stats.cycleStatsLk.Unlock()
600601

602+
if err == nil {
603+
logger.Debugw("sent provider record", "prefix", coveredPrefix, "count", 1, "keys", keys)
604+
}
605+
601606
return coveredPrefix, err
602607
}
603608

@@ -1160,6 +1165,7 @@ func (s *SweepingProvider) batchProvide(prefix bitstr.Key, keys []mh.Multihash)
11601165
if keyCount == 0 {
11611166
return
11621167
}
1168+
logger.Debugw("batchProvide called", "prefix", prefix, "count", len(keys))
11631169
addrInfo, ok := s.selfAddrInfo()
11641170
if !ok {
11651171
// Don't provide if the node doesn't have a valid address to include in the
@@ -1415,6 +1421,9 @@ func (s *SweepingProvider) provideRegions(regions []keyspace.Region, addrInfo pe
14151421
}
14161422
continue
14171423
}
1424+
keyCount := len(allKeys)
1425+
s.provideCounter.Add(s.ctx, int64(keyCount))
1426+
logger.Debugw("sent provider records", "prefix", r.Prefix, "count", keyCount, "keys", allKeys)
14181427
}
14191428
// If at least 1 regions was provided, we don't consider it a failure.
14201429
return errCount < len(regions)
@@ -1506,25 +1515,6 @@ func (s *SweepingProvider) Clear() int {
15061515
return s.provideQueue.Clear()
15071516
}
15081517

1509-
// ProvideState encodes the current relationship between this node and `key`.
1510-
type ProvideState uint8
1511-
1512-
const (
1513-
StateUnknown ProvideState = iota // we have no record of the key
1514-
StateQueued // key is queued to be provided
1515-
StateProvided // key was provided at least once
1516-
)
1517-
1518-
// ProvideStatus reports the provider’s view of a key.
1519-
//
1520-
// When `state == StateProvided`, `lastProvide` is the wall‑clock time of the
1521-
// most recent successful provide operation (UTC).
1522-
// For `StateQueued` or `StateUnknown`, `lastProvide` is the zero `time.Time`.
1523-
func (s *SweepingProvider) ProvideStatus(key mh.Multihash) (state ProvideState, lastProvide time.Time) {
1524-
// TODO: implement me
1525-
return StateUnknown, time.Time{}
1526-
}
1527-
15281518
// AddToSchedule makes sure the prefixes associated with the supplied keys are
15291519
// scheduled to be reprovided.
15301520
//

0 commit comments

Comments
 (0)