Skip to content

Commit 06ec289

Browse files
Merge branch 'master' into provider-stats
2 parents 56a8625 + 0114a72 commit 06ec289

File tree

11 files changed

+404
-172
lines changed

11 files changed

+404
-172
lines changed

provider/buffered/provider.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ const (
2929

3030
var _ internal.Provider = (*SweepingProvider)(nil)
3131

32-
// buffered.SweepingProvider is a wrapper around a SweepingProvider buffering
32+
// SweepingProvider (buffered) is a wrapper around a SweepingProvider buffering
3333
// requests, to allow core operations to return instantly. Operations are
3434
// queued and processed asynchronously in batches for improved performance.
3535
type SweepingProvider struct {

provider/dual/options.go

Lines changed: 26 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@ import (
55
"fmt"
66
"time"
77

8-
ds "github.com/ipfs/go-datastore"
98
"github.com/libp2p/go-libp2p-kad-dht/amino"
109
"github.com/libp2p/go-libp2p-kad-dht/dual"
1110
pb "github.com/libp2p/go-libp2p-kad-dht/pb"
@@ -38,53 +37,44 @@ type config struct {
3837

3938
type Option func(opt *config) error
4039

41-
func (cfg *config) apply(opts ...Option) error {
42-
for i, o := range opts {
43-
if err := o(cfg); err != nil {
44-
return fmt.Errorf("dual dht provider option %d failed: %w", i, err)
40+
// getOpts creates a config and applies Options to it.
41+
func getOpts(opts []Option, d *dual.DHT) (config, error) {
42+
cfg := config{
43+
reprovideInterval: [2]time.Duration{amino.DefaultReprovideInterval, amino.DefaultReprovideInterval},
44+
maxReprovideDelay: [2]time.Duration{provider.DefaultMaxReprovideDelay, provider.DefaultMaxReprovideDelay},
45+
46+
offlineDelay: [2]time.Duration{provider.DefaultOfflineDelay, provider.DefaultOfflineDelay},
47+
connectivityCheckOnlineInterval: [2]time.Duration{provider.DefaultConnectivityCheckOnlineInterval, provider.DefaultConnectivityCheckOnlineInterval},
48+
49+
maxWorkers: [2]int{4, 4},
50+
dedicatedPeriodicWorkers: [2]int{2, 2},
51+
dedicatedBurstWorkers: [2]int{1, 1},
52+
maxProvideConnsPerWorker: [2]int{20, 20},
53+
}
54+
55+
// Apply options
56+
for i, opt := range opts {
57+
if err := opt(&cfg); err != nil {
58+
return config{}, fmt.Errorf("dual dht provider option %d failed: %w", i, err)
4559
}
4660
}
47-
return nil
48-
}
4961

50-
func (cfg *config) resolveDefaults(d *dual.DHT) {
62+
// Resolve defaults
5163
if cfg.msgSenders[lanID] == nil {
5264
cfg.msgSenders[lanID] = d.LAN.MessageSender()
5365
}
5466
if cfg.msgSenders[wanID] == nil {
5567
cfg.msgSenders[wanID] = d.WAN.MessageSender()
5668
}
57-
}
5869

59-
func (c *config) validate() error {
60-
if c.dedicatedPeriodicWorkers[lanID]+c.dedicatedBurstWorkers[lanID] > c.maxWorkers[lanID] {
61-
return errors.New("provider config: total dedicated workers exceed max workers")
70+
// Validate config
71+
if cfg.dedicatedPeriodicWorkers[lanID]+cfg.dedicatedBurstWorkers[lanID] > cfg.maxWorkers[lanID] {
72+
return config{}, errors.New("provider config: total dedicated workers exceed max workers")
6273
}
63-
if c.dedicatedPeriodicWorkers[wanID]+c.dedicatedBurstWorkers[wanID] > c.maxWorkers[wanID] {
64-
return errors.New("provider config: total dedicated workers exceed max workers")
74+
if cfg.dedicatedPeriodicWorkers[wanID]+cfg.dedicatedBurstWorkers[wanID] > cfg.maxWorkers[wanID] {
75+
return config{}, errors.New("provider config: total dedicated workers exceed max workers")
6576
}
66-
return nil
67-
}
68-
69-
var DefaultConfig = func(cfg *config) error {
70-
var err error
71-
cfg.keystore, err = keystore.NewKeystore(ds.NewMapDatastore())
72-
if err != nil {
73-
return err
74-
}
75-
76-
cfg.reprovideInterval = [2]time.Duration{amino.DefaultReprovideInterval, amino.DefaultReprovideInterval}
77-
cfg.maxReprovideDelay = [2]time.Duration{provider.DefaultMaxReprovideDelay, provider.DefaultMaxReprovideDelay}
78-
79-
cfg.offlineDelay = [2]time.Duration{provider.DefaultOfflineDelay, provider.DefaultOfflineDelay}
80-
cfg.connectivityCheckOnlineInterval = [2]time.Duration{provider.DefaultConnectivityCheckOnlineInterval, provider.DefaultConnectivityCheckOnlineInterval}
81-
82-
cfg.maxWorkers = [2]int{4, 4}
83-
cfg.dedicatedPeriodicWorkers = [2]int{2, 2}
84-
cfg.dedicatedBurstWorkers = [2]int{1, 1}
85-
cfg.maxProvideConnsPerWorker = [2]int{20, 20}
86-
87-
return nil
77+
return cfg, nil
8878
}
8979

9080
func WithKeystore(ks keystore.Keystore) Option {

provider/dual/provider.go

Lines changed: 35 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import (
66
"fmt"
77

88
"github.com/ipfs/go-cid"
9+
"github.com/ipfs/go-datastore"
910
dht "github.com/libp2p/go-libp2p-kad-dht"
1011
"github.com/libp2p/go-libp2p-kad-dht/dual"
1112
"github.com/libp2p/go-libp2p-kad-dht/provider"
@@ -23,6 +24,8 @@ type SweepingProvider struct {
2324
LAN *provider.SweepingProvider
2425
WAN *provider.SweepingProvider
2526
keystore keystore.Keystore
27+
28+
cleanupFuncs []func() error
2629
}
2730

2831
// New creates a new SweepingProvider that manages provides and reprovides for
@@ -32,15 +35,19 @@ func New(d *dual.DHT, opts ...Option) (*SweepingProvider, error) {
3235
return nil, errors.New("cannot create sweeping provider for nil dual DHT")
3336
}
3437

35-
var cfg config
36-
err := cfg.apply(append([]Option{DefaultConfig}, opts...)...)
38+
cfg, err := getOpts(opts, d)
3739
if err != nil {
3840
return nil, err
3941
}
40-
cfg.resolveDefaults(d)
41-
err = cfg.validate()
42-
if err != nil {
43-
return nil, err
42+
var cleanupFuncs []func() error
43+
if cfg.keystore == nil {
44+
ds := datastore.NewMapDatastore()
45+
cfg.keystore, err = keystore.NewKeystore(ds)
46+
if err != nil {
47+
ds.Close()
48+
return nil, fmt.Errorf("couldn't create a keystore: %w", err)
49+
}
50+
cleanupFuncs = []func() error{ds.Close, cfg.keystore.Close}
4451
}
4552

4653
sweepingProviders := make([]*provider.SweepingProvider, 2)
@@ -74,10 +81,11 @@ func New(d *dual.DHT, opts ...Option) (*SweepingProvider, error) {
7481
}
7582

7683
return &SweepingProvider{
77-
dht: d,
78-
LAN: sweepingProviders[0],
79-
WAN: sweepingProviders[1],
80-
keystore: cfg.keystore,
84+
dht: d,
85+
LAN: sweepingProviders[0],
86+
WAN: sweepingProviders[1],
87+
keystore: cfg.keystore,
88+
cleanupFuncs: cleanupFuncs,
8189
}, nil
8290
}
8391

@@ -102,9 +110,25 @@ func (s *SweepingProvider) runOnBoth(f func(*provider.SweepingProvider) error) e
102110

103111
// Close stops both DHT providers and releases associated resources.
104112
func (s *SweepingProvider) Close() error {
105-
return s.runOnBoth(func(p *provider.SweepingProvider) error {
113+
err := s.runOnBoth(func(p *provider.SweepingProvider) error {
106114
return p.Close()
107115
})
116+
117+
if s.cleanupFuncs != nil {
118+
// Cleanup keystore and datastore if we created them
119+
var errs []error
120+
for i := len(s.cleanupFuncs) - 1; i >= 0; i-- { // LIFO: last-added is cleaned up first
121+
if f := s.cleanupFuncs[i]; f != nil {
122+
if err := f(); err != nil {
123+
errs = append(errs, err)
124+
}
125+
}
126+
}
127+
if len(errs) > 0 {
128+
err = errors.Join(append(errs, err)...)
129+
}
130+
}
131+
return err
108132
}
109133

110134
// ProvideOnce sends provider records for the specified keys to both DHT swarms

provider/internal/connectivity/connectivity.go

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -55,8 +55,7 @@ type ConnectivityChecker struct {
5555

5656
// New creates a new ConnectivityChecker instance.
5757
func New(checkFunc func() bool, opts ...Option) (*ConnectivityChecker, error) {
58-
var cfg config
59-
err := cfg.apply(append([]Option{DefaultConfig}, opts...)...)
58+
cfg, err := getOpts(opts)
6059
if err != nil {
6160
return nil, err
6261
}

provider/internal/connectivity/options.go

Lines changed: 13 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -14,21 +14,21 @@ type config struct {
1414
onOnline func()
1515
}
1616

17-
func (cfg *config) apply(opts ...Option) error {
18-
for i, o := range opts {
19-
if err := o(cfg); err != nil {
20-
return fmt.Errorf("reprovider dht option %d failed: %w", i, err)
21-
}
22-
}
23-
return nil
24-
}
25-
2617
type Option func(opt *config) error
2718

28-
var DefaultConfig = func(cfg *config) error {
29-
cfg.onlineCheckInterval = 1 * time.Minute
30-
cfg.offlineDelay = 2 * time.Hour
31-
return nil
19+
// getOpts creates a config and applies Options to it.
20+
func getOpts(opts []Option) (config, error) {
21+
cfg := config{
22+
onlineCheckInterval: 1 * time.Minute,
23+
offlineDelay: 2 * time.Hour,
24+
}
25+
26+
for i, opt := range opts {
27+
if err := opt(&cfg); err != nil {
28+
return config{}, fmt.Errorf("connectivity option %d error: %s", i, err)
29+
}
30+
}
31+
return cfg, nil
3232
}
3333

3434
// WithOnlineCheckInterval sets the minimum interval between online checks.

provider/internal/keyspace/trie.go

Lines changed: 60 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
package keyspace
22

33
import (
4+
"slices"
5+
46
"github.com/libp2p/go-libp2p/core/peer"
57
mh "github.com/multiformats/go-multihash"
68

@@ -185,46 +187,92 @@ func pruneSubtrieAtDepth[K0 kad.Key[K0], K1 kad.Key[K1], D any](t *trie.Trie[K0,
185187
}
186188

187189
// TrieGaps returns all prefixes that aren't covered by a key (prefix) in the
188-
// trie. Combining the prefixes included in the trie with the gap prefixes
189-
// results in a full keyspace coverage.
190+
// trie, at the `target` location. Combining the prefixes included in the trie
191+
// with the gap prefixes results in a full keyspace coverage of the `target`
192+
// subtrie.
193+
//
194+
// Results are sorted according to the provided `order`.
190195
//
191-
// E.g Trie: ["00", "100"], GapsInTrie: ["01", "101", "11"]
192-
func TrieGaps[D any](t *trie.Trie[bitstr.Key, D]) []bitstr.Key {
196+
// Example:
197+
// - Trie: ["0000", "0010", "100"]
198+
// - Target: "0"
199+
// - Order: "0000",
200+
// - GapsInTrie: ["0001", "0011", "01"]
201+
func TrieGaps[K kad.Key[K], D any](t *trie.Trie[bitstr.Key, D], target bitstr.Key, order K) []bitstr.Key {
193202
if t.IsLeaf() {
194-
if t.HasKey() {
195-
return SiblingPrefixes(*t.Key())
203+
if k := t.Key(); k != nil {
204+
if IsBitstrPrefix(target, *k) {
205+
siblingPrefixes := SiblingPrefixes(*k)[len(target):]
206+
sortBitstrKeysByOrder(siblingPrefixes, order)
207+
return siblingPrefixes
208+
}
209+
if IsBitstrPrefix(*k, target) {
210+
// The only key in the trie is a prefix of target, meaning the whole
211+
// target is covered.
212+
return nil
213+
}
196214
}
197-
return []bitstr.Key{""}
215+
return []bitstr.Key{target}
198216
}
199-
return trieGapsAtDepth(t, 0)
217+
return trieGapsAtDepth(t, 0, target, order)
200218
}
201219

202-
func trieGapsAtDepth[D any](t *trie.Trie[bitstr.Key, D], depth int) []bitstr.Key {
220+
func trieGapsAtDepth[K kad.Key[K], D any](t *trie.Trie[bitstr.Key, D], depth int, target bitstr.Key, order K) []bitstr.Key {
203221
var gaps []bitstr.Key
204-
for i := range 2 {
222+
insideTarget := depth >= target.BitLen()
223+
b := int(order.Bit(depth))
224+
for _, i := range []int{b, 1 - b} {
225+
if !insideTarget && i != int(target.Bit(depth)) {
226+
continue
227+
}
205228
bstr := bitstr.Key(byte('0' + i))
206229
if b := t.Branch(i); b == nil {
207230
gaps = append(gaps, bstr)
208231
} else if b.IsLeaf() {
209232
if b.HasKey() {
210233
k := *b.Key()
211234
if len(k) > depth+1 {
212-
for _, siblingPrefix := range SiblingPrefixes(k)[depth+1:] {
235+
siblingPrefixes := SiblingPrefixes(k)[depth+1:]
236+
sortBitstrKeysByOrder(siblingPrefixes, order)
237+
for _, siblingPrefix := range siblingPrefixes {
213238
gaps = append(gaps, siblingPrefix[depth:])
214239
}
215240
}
216241
} else {
217242
gaps = append(gaps, bstr)
218243
}
219244
} else {
220-
for _, gap := range trieGapsAtDepth(b, depth+1) {
245+
for _, gap := range trieGapsAtDepth(b, depth+1, target, order) {
221246
gaps = append(gaps, bstr+gap)
222247
}
223248
}
224249
}
225250
return gaps
226251
}
227252

253+
// sortBitstrKeysByOrder sorts the provided bitstr keys according to the
254+
// provided order.
255+
func sortBitstrKeysByOrder[K kad.Key[K]](keys []bitstr.Key, order K) {
256+
slices.SortFunc(keys, func(a, b bitstr.Key) int {
257+
maxLen := min(len(a), len(b), order.BitLen())
258+
for i := range maxLen {
259+
if a[i] != b[i] {
260+
if a.Bit(i) == order.Bit(i) {
261+
return -1
262+
}
263+
return 1
264+
}
265+
}
266+
if len(a) == len(b) || maxLen == order.BitLen() {
267+
return 0
268+
}
269+
if len(a) < len(b) {
270+
return 1
271+
}
272+
return -1
273+
})
274+
}
275+
228276
// mapMerge merges all key-value pairs from the source map into the destination
229277
// map. Values from the source are appended to existing slices in the
230278
// destination.

0 commit comments

Comments
 (0)