Skip to content

Commit b75fe58

Browse files
1. Expire non-provider records older than MaxAge
2. Original publisher should republish putvalue records 3. Peers who receive a record will republish hourly
1 parent a12e621 commit b75fe58

File tree

10 files changed

+399
-14
lines changed

10 files changed

+399
-14
lines changed

dht.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,8 @@ type IpfsDHT struct {
5353
routingTable *kb.RoutingTable // Array of routing tables for differently distanced nodes
5454
providers *providers.ProviderManager
5555

56+
nonProvRecordsManager *NonProvRecordsManager
57+
5658
birth time.Time // When this peer started up
5759

5860
Validator record.Validator
@@ -98,13 +100,15 @@ func New(ctx context.Context, h host.Host, options ...opts.Option) (*IpfsDHT, er
98100
})
99101

100102
dht.proc.AddChild(dht.providers.Process())
103+
dht.proc.AddChild(dht.nonProvRecordsManager.Process())
101104
dht.Validator = cfg.Validator
102105

103106
if !cfg.Client {
104107
for _, p := range cfg.Protocols {
105108
h.SetStreamHandler(p, dht.handleNewStream)
106109
}
107110
}
111+
108112
return dht, nil
109113
}
110114

@@ -156,6 +160,7 @@ func makeDHT(ctx context.Context, h host.Host, dstore ds.Batching, protocols []p
156160
}
157161

158162
dht.ctx = dht.newContextWithLocalTags(ctx)
163+
dht.nonProvRecordsManager = NewNonProvRecordsManager(ctx, dht, dstore)
159164

160165
return dht
161166
}

go.mod

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ require (
2121
github.com/mr-tron/base58 v1.1.2
2222
github.com/multiformats/go-multiaddr v0.0.4
2323
github.com/multiformats/go-multiaddr-dns v0.0.2
24+
github.com/multiformats/go-multihash v0.0.5
2425
github.com/multiformats/go-multistream v0.1.0
2526
github.com/stretchr/testify v1.3.0
2627
github.com/whyrusleeping/base32 v0.0.0-20170828182744-c30ac30633cc

handlers.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -121,7 +121,7 @@ func (dht *IpfsDHT) checkLocalDatastore(k []byte) (*recpb.Record, error) {
121121
recordIsBad = true
122122
}
123123

124-
if time.Since(recvtime) > MaxRecordAge {
124+
if time.Since(recvtime) > maxNonProvRecordAge {
125125
logger.Debug("old record found, tossing.")
126126
recordIsBad = true
127127
}
@@ -396,3 +396,7 @@ func (dht *IpfsDHT) handleAddProvider(ctx context.Context, p peer.ID, pmes *pb.M
396396
func convertToDsKey(s []byte) ds.Key {
397397
return ds.NewKey(base32.RawStdEncoding.EncodeToString(s))
398398
}
399+
400+
func convertToOriginalKey(k string) ([]byte, error) {
401+
return base32.RawStdEncoding.DecodeString(k)
402+
}

non_prov_records.go

Lines changed: 193 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,193 @@
1+
package dht
2+
3+
import (
4+
"context"
5+
"math/rand"
6+
"strings"
7+
"sync"
8+
"time"
9+
10+
"github.com/gogo/protobuf/proto"
11+
ds "github.com/ipfs/go-datastore"
12+
"github.com/ipfs/go-datastore/query"
13+
u "github.com/ipfs/go-ipfs-util"
14+
"github.com/jbenet/goprocess"
15+
"github.com/jbenet/goprocess/context"
16+
"github.com/libp2p/go-libp2p-kad-dht/providers"
17+
recpb "github.com/libp2p/go-libp2p-record/pb"
18+
)
19+
20+
// vars for cleaning up expired records
21+
var nonProvRecordCleanupInterval = time.Hour
22+
23+
// maxNonProvRecordAge specifies the maximum time that any node will hold onto a record
24+
// from the time its received. This does not apply to any other forms of validity that
25+
// the record may contain.
26+
// For example, a record may contain an ipns entry with an EOL saying its valid
27+
// until the year 2020 (a great time in the future). For that record to stick around
28+
// it must be rebroadcasted more frequently than once every 'maxNonProvRecordAge'
29+
var maxNonProvRecordAge = time.Hour * 36
30+
31+
// vars for republishing records
32+
var nonProvRecordRePublishInterval = 1 * time.Hour
33+
var nonProvRecordRePublishAge = 1 * time.Hour
34+
var enableRepublishJitter = true
35+
36+
type NonProvRecordsManager struct {
37+
dht *IpfsDHT
38+
ctx context.Context
39+
40+
proc goprocess.Process
41+
dstore ds.Batching
42+
43+
cleanupInterval time.Duration // scan interval for expiring records
44+
45+
rePublishInterval time.Duration // scan interval for republishing records
46+
}
47+
48+
func NewNonProvRecordsManager(ctx context.Context, dht *IpfsDHT, dstore ds.Batching) *NonProvRecordsManager {
49+
m := new(NonProvRecordsManager)
50+
m.dht = dht
51+
m.ctx = ctx
52+
m.dstore = dstore
53+
m.proc = goprocessctx.WithContext(ctx)
54+
55+
// expire records beyond maxage
56+
m.cleanupInterval = nonProvRecordCleanupInterval
57+
m.proc.Go(m.expire)
58+
59+
// republish records older than prescribed age
60+
m.rePublishInterval = nonProvRecordRePublishInterval
61+
m.proc.Go(m.rePublish)
62+
63+
return m
64+
}
65+
66+
func (m *NonProvRecordsManager) Process() goprocess.Process {
67+
return m.proc
68+
}
69+
70+
func (m *NonProvRecordsManager) rePublish(proc goprocess.Process) {
71+
for {
72+
var d = 0 * time.Minute
73+
// minimizes the probability of all peers re-publishing together
74+
// the first peer that re-publishes resets the receivedAt time on the record
75+
// on all other peers that are among the K closest to the key, thus minimizing the number of republishes by other peers
76+
if enableRepublishJitter {
77+
d = time.Duration(rand.Intn(16)) * time.Minute
78+
}
79+
80+
select {
81+
case <-proc.Closing():
82+
return
83+
case <-time.After(m.rePublishInterval + d):
84+
}
85+
86+
tFnc := func(t time.Time) bool {
87+
return time.Since(t) > nonProvRecordRePublishAge && time.Since(t) < maxNonProvRecordAge
88+
}
89+
90+
res, err := m.dstore.Query(query.Query{Filters: []query.Filter{&nonProvRecordFilter{tFnc}}})
91+
if err != nil {
92+
logger.Errorf("republish records proc: failed to run query against datastore, error is %s", err)
93+
continue
94+
}
95+
96+
var wg sync.WaitGroup
97+
// semaphore to rate-limit number of concurrent PutValue calls
98+
semaphore := make(chan struct{}, 5)
99+
for {
100+
e, ok := res.NextSync()
101+
if !ok {
102+
break
103+
}
104+
105+
semaphore <- struct{}{}
106+
wg.Add(1)
107+
go func(e query.Result) {
108+
defer func() {
109+
<-semaphore
110+
wg.Done()
111+
}()
112+
113+
// unmarshal record
114+
rec := new(recpb.Record)
115+
if err := proto.Unmarshal(e.Value, rec); err != nil {
116+
logger.Debugf("republish records proc: failed to unmarshal DHT record from datastore, error is %s", err)
117+
return
118+
}
119+
120+
// call put value
121+
putCtx, cancel := context.WithTimeout(m.ctx, 2*time.Minute)
122+
defer cancel()
123+
124+
// do not use e.key here as that represents the transformed version of the original key
125+
// rec.GetKey is the original key sent by the peer who put this record to dht
126+
if err := m.dht.PutValue(putCtx, string(rec.GetKey()), rec.Value); err != nil {
127+
logger.Debugf("republish records proc: failed to re-publish to the network, error is %s", err)
128+
}
129+
}(e)
130+
}
131+
wg.Wait()
132+
}
133+
}
134+
135+
func (m *NonProvRecordsManager) expire(proc goprocess.Process) {
136+
for {
137+
select {
138+
case <-proc.Closing():
139+
return
140+
case <-time.After(m.cleanupInterval):
141+
}
142+
143+
tFnc := func(t time.Time) bool {
144+
return time.Since(t) > maxNonProvRecordAge
145+
}
146+
147+
res, err := m.dstore.Query(query.Query{Filters: []query.Filter{&nonProvRecordFilter{tFnc}}})
148+
if err != nil {
149+
logger.Errorf("expire records proc: failed to run query against datastore, error is %s", err)
150+
continue
151+
}
152+
153+
for {
154+
e, ok := res.NextSync()
155+
if !ok {
156+
break
157+
}
158+
if err := m.dstore.Delete(ds.RawKey(e.Key)); err != nil {
159+
logger.Errorf("expire records proc: failed to delete key %s from datastore, error is %s", e.Key, err)
160+
}
161+
}
162+
}
163+
}
164+
165+
type timeFilterFnc = func(t time.Time) bool
166+
167+
type nonProvRecordFilter struct {
168+
tFnc timeFilterFnc
169+
}
170+
171+
func (f *nonProvRecordFilter) Filter(e query.Entry) bool {
172+
// unmarshal record
173+
rec := new(recpb.Record)
174+
if err := proto.Unmarshal(e.Value, rec); err != nil {
175+
logger.Debugf("expire records filter: failed to unmarshal DHT record from datastore, error is %s", err)
176+
return false
177+
}
178+
179+
// should not be a provider record
180+
if strings.HasPrefix(e.Key, providers.ProvidersKeyPrefix) {
181+
return false
182+
}
183+
184+
// parse received time
185+
t, err := u.ParseRFC3339(rec.TimeReceived)
186+
if err != nil {
187+
logger.Debugf("expire records filter: failed to parse time in DHT record, error is %s", err)
188+
return false
189+
}
190+
191+
// apply the time filter fnc to the received time
192+
return f.tFnc(t)
193+
}

0 commit comments

Comments
 (0)