Skip to content

Commit

Permalink
WIP - Add/implement Iterable interface for ipfs badger
Browse files Browse the repository at this point in the history
  • Loading branch information
AndrewSisley committed Nov 26, 2021
1 parent e1c2caa commit a7c5f36
Showing 1 changed file with 261 additions and 0 deletions.
261 changes: 261 additions & 0 deletions datastores/badger/v3/iterator.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,261 @@
package badger

// this is quite similar in principle to https://github.com/MikkelHJuul/bIter/blob/main/iterator.go that John linked - maybe just use/wrap that
import (
"fmt"

badger "github.com/dgraph-io/badger/v3"
ds "github.com/ipfs/go-datastore"
dsq "github.com/ipfs/go-datastore/query"
goprocess "github.com/jbenet/goprocess"
)

type Iterable interface {
GetIterator(q dsq.Query) (Iterator, error)
}

type Iterator interface {
IteratePrefix(prefix ds.Key) dsq.Results
}

type BadgerIterator struct {
iterator *badger.Iterator
resultsBuilder *dsq.ResultBuilder
query dsq.Query
txn txn
skipped int
sent int
closedEarly bool
}

func (t *txn) GetIterator(q dsq.Query) (Iterator, error) {
opt := badger.DefaultIteratorOptions
opt.PrefetchValues = !q.KeysOnly
// check that this can be removed!
prefix := ds.NewKey(q.Prefix).String()
if prefix != "/" {
//opt.Prefix = []byte(prefix + "/")
}

// Handle ordering
if len(q.Orders) > 0 {
switch q.Orders[0].(type) {
case dsq.OrderByKey, *dsq.OrderByKey:
// We order by key by default.
case dsq.OrderByKeyDescending, *dsq.OrderByKeyDescending:
// Reverse order by key
opt.Reverse = true
default:
// Ok, we have a weird order we can't handle. Let's
// perform the _base_ query (prefix, filter, etc.), then
// handle sort/offset/limit later.

// Skip the stuff we can't apply.
baseQuery := q
baseQuery.Limit = 0
baseQuery.Offset = 0
baseQuery.Orders = nil

// perform the base query.
res, err := t.query(baseQuery)
if err != nil {
return nil, err
}

// fix the query
res = dsq.ResultsReplaceQuery(res, q)

// Remove the parts we've already applied.
naiveQuery := q
naiveQuery.Prefix = ""
naiveQuery.Filters = nil
panic("todo")
// Apply the rest of the query
//return dsq.NaiveQueryApply(naiveQuery, res), nil //todo
}
}

badgerIterator := t.txn.NewIterator(opt)
badgerIterator.Rewind()
//badgerIterator.Rewind()
//qrb := dsq.NewResultBuilder(q)

//go qrb.Process.CloseAfterChildren() //nolint

iterator := BadgerIterator{
iterator: badgerIterator,
// resultsBuilder: qrb,
txn: *t,
}

return &iterator, nil
}

func (iterator *BadgerIterator) IteratePrefix(prefix ds.Key) dsq.Results {
fmt.Println("IteratePrefix")
prefixString := prefix.String()
if prefixString != "/" {
prefixString = prefixString + "/"
}
prefixAsByteArray := []byte(prefixString)

iterator.resultsBuilder = dsq.NewResultBuilder(iterator.query)

iterator.resultsBuilder.Process.Go(func(worker goprocess.Process) {
iterator.txn.ds.closeLk.RLock()
iterator.closedEarly = false
defer func() {
iterator.txn.ds.closeLk.RUnlock()
if iterator.closedEarly {
select {
case iterator.resultsBuilder.Output <- dsq.Result{
Error: ErrClosed,
}:
case <-iterator.resultsBuilder.Process.Closing():
}
}

}()
if iterator.txn.ds.closed {
iterator.closedEarly = true
return
}
defer iterator.iterator.Close()
/* MOVE!!! - actually I dont think this is needed
// this iterator is part of an implicit transaction, so when
// we're done we must discard the transaction. It's safe to
// discard the txn it because it contains the iterator only.
if t.implicit {
defer t.discard()
}
*/
// All iterators must be started by rewinding.
iterator.iterator.Rewind() //todo - remove
iterator.iterator.Seek(prefixAsByteArray)

iterator.scanThroughToOffset(prefixAsByteArray, worker)
iterator.yieldResults(prefixAsByteArray, worker)
})

go iterator.resultsBuilder.Process.CloseAfterChildren() //nolint

return iterator.resultsBuilder.Results()
}

func (iterator *BadgerIterator) scanThroughToOffset(prefix []byte, worker goprocess.Process) { // we might also not need/use this at all
// skip to the offset
for _ = 0; iterator.skipped < iterator.query.Offset && iterator.iterator.ValidForPrefix(prefix); iterator.iterator.Next() {
// On the happy path, we have no filters and we can go
// on our way.
fmt.Println("skipping")
if len(iterator.query.Filters) == 0 {
iterator.skipped++
continue
}

// On the sad path, we need to apply filters before
// counting the item as "skipped" as the offset comes
// _after_ the filter.
item := iterator.iterator.Item()

matches := true
check := func(value []byte) error {
e := dsq.Entry{
Key: string(item.Key()),
Value: value,
Size: int(item.ValueSize()), // this function is basically free
}

// Only calculate expirations if we need them.
if iterator.query.ReturnExpirations {
e.Expiration = expires(item)
}
matches = filter(iterator.query.Filters, e)
return nil
}

// Maybe check with the value, only if we need it.
var err error
if iterator.query.KeysOnly {
err = check(nil)
} else {
err = item.Value(check)
}

if err != nil {
select {
case iterator.resultsBuilder.Output <- dsq.Result{Error: err}:
case <-iterator.txn.ds.closing: // datastore closing.
iterator.closedEarly = true
return
case <-worker.Closing(): // client told us to close early
return
}
}

if !matches {
iterator.skipped++
}
}
}

func (iterator *BadgerIterator) yieldResults(prefix []byte, worker goprocess.Process) {
//iterator.iterator.Next()
fmt.Println("prefix")
fmt.Println(string(prefix))
for _ = 0; iterator.query.Limit <= 0 || iterator.sent < iterator.query.Limit; iterator.iterator.Next() {
if !iterator.iterator.Valid() {
fmt.Println("invalid")
//fmt.Println(iterator.iterator.Item()) //item is nil
}
if !iterator.iterator.ValidForPrefix(prefix) {
fmt.Println("invalid for prefix")
return
}
item := iterator.iterator.Item()
e := dsq.Entry{Key: string(item.Key())}

// Maybe get the value
var result dsq.Result
if !iterator.query.KeysOnly {
fmt.Println("yielding")
b, err := item.ValueCopy(nil)
if err != nil {
fmt.Println(err)
result = dsq.Result{Error: err}
} else {
e.Value = b
e.Size = len(b)
result = dsq.Result{Entry: e}
fmt.Println(e)
}
} else {
e.Size = int(item.ValueSize())
result = dsq.Result{Entry: e}
}

if iterator.query.ReturnExpirations {
result.Expiration = expires(item)
}

// Finally, filter it (unless we're dealing with an error).
if result.Error == nil && filter(iterator.query.Filters, e) {
fmt.Println("filtered")
continue
}

select {
case iterator.resultsBuilder.Output <- result:
iterator.sent++
case <-iterator.txn.ds.closing: // datastore closing.
iterator.closedEarly = true
fmt.Println("closing early")
return
case <-worker.Closing(): // client told us to close early
fmt.Println("closing")
return
}
}
fmt.Println("yieldResults done")
}

0 comments on commit a7c5f36

Please sign in to comment.