diff --git a/datastores/badger/v3/iterator.go b/datastores/badger/v3/iterator.go new file mode 100644 index 0000000000..74921b1d9c --- /dev/null +++ b/datastores/badger/v3/iterator.go @@ -0,0 +1,261 @@ +package badger + +// this is quite similar in principle to https://github.com/MikkelHJuul/bIter/blob/main/iterator.go that John linked - maybe just use/wrap that +import ( + "fmt" + + badger "github.com/dgraph-io/badger/v3" + ds "github.com/ipfs/go-datastore" + dsq "github.com/ipfs/go-datastore/query" + goprocess "github.com/jbenet/goprocess" +) + +type Iterable interface { + GetIterator(q dsq.Query) (Iterator, error) +} + +type Iterator interface { + IteratePrefix(prefix ds.Key) dsq.Results +} + +type BadgerIterator struct { + iterator *badger.Iterator + resultsBuilder *dsq.ResultBuilder + query dsq.Query + txn txn + skipped int + sent int + closedEarly bool +} + +func (t *txn) GetIterator(q dsq.Query) (Iterator, error) { + opt := badger.DefaultIteratorOptions + opt.PrefetchValues = !q.KeysOnly + // check that this can be removed! + prefix := ds.NewKey(q.Prefix).String() + if prefix != "/" { + //opt.Prefix = []byte(prefix + "/") + } + + // Handle ordering + if len(q.Orders) > 0 { + switch q.Orders[0].(type) { + case dsq.OrderByKey, *dsq.OrderByKey: + // We order by key by default. + case dsq.OrderByKeyDescending, *dsq.OrderByKeyDescending: + // Reverse order by key + opt.Reverse = true + default: + // Ok, we have a weird order we can't handle. Let's + // perform the _base_ query (prefix, filter, etc.), then + // handle sort/offset/limit later. + + // Skip the stuff we can't apply. + baseQuery := q + baseQuery.Limit = 0 + baseQuery.Offset = 0 + baseQuery.Orders = nil + + // perform the base query. + res, err := t.query(baseQuery) + if err != nil { + return nil, err + } + + // fix the query + res = dsq.ResultsReplaceQuery(res, q) + + // Remove the parts we've already applied. + naiveQuery := q + naiveQuery.Prefix = "" + naiveQuery.Filters = nil + panic("todo") + // Apply the rest of the query + //return dsq.NaiveQueryApply(naiveQuery, res), nil //todo + } + } + + badgerIterator := t.txn.NewIterator(opt) + badgerIterator.Rewind() + //badgerIterator.Rewind() + //qrb := dsq.NewResultBuilder(q) + + //go qrb.Process.CloseAfterChildren() //nolint + + iterator := BadgerIterator{ + iterator: badgerIterator, + // resultsBuilder: qrb, + txn: *t, + } + + return &iterator, nil +} + +func (iterator *BadgerIterator) IteratePrefix(prefix ds.Key) dsq.Results { + fmt.Println("IteratePrefix") + prefixString := prefix.String() + if prefixString != "/" { + prefixString = prefixString + "/" + } + prefixAsByteArray := []byte(prefixString) + + iterator.resultsBuilder = dsq.NewResultBuilder(iterator.query) + + iterator.resultsBuilder.Process.Go(func(worker goprocess.Process) { + iterator.txn.ds.closeLk.RLock() + iterator.closedEarly = false + defer func() { + iterator.txn.ds.closeLk.RUnlock() + if iterator.closedEarly { + select { + case iterator.resultsBuilder.Output <- dsq.Result{ + Error: ErrClosed, + }: + case <-iterator.resultsBuilder.Process.Closing(): + } + } + + }() + if iterator.txn.ds.closed { + iterator.closedEarly = true + return + } + defer iterator.iterator.Close() + /* MOVE!!! - actually I dont think this is needed + // this iterator is part of an implicit transaction, so when + // we're done we must discard the transaction. It's safe to + // discard the txn it because it contains the iterator only. + if t.implicit { + defer t.discard() + } + + */ + // All iterators must be started by rewinding. + iterator.iterator.Rewind() //todo - remove + iterator.iterator.Seek(prefixAsByteArray) + + iterator.scanThroughToOffset(prefixAsByteArray, worker) + iterator.yieldResults(prefixAsByteArray, worker) + }) + + go iterator.resultsBuilder.Process.CloseAfterChildren() //nolint + + return iterator.resultsBuilder.Results() +} + +func (iterator *BadgerIterator) scanThroughToOffset(prefix []byte, worker goprocess.Process) { // we might also not need/use this at all + // skip to the offset + for _ = 0; iterator.skipped < iterator.query.Offset && iterator.iterator.ValidForPrefix(prefix); iterator.iterator.Next() { + // On the happy path, we have no filters and we can go + // on our way. + fmt.Println("skipping") + if len(iterator.query.Filters) == 0 { + iterator.skipped++ + continue + } + + // On the sad path, we need to apply filters before + // counting the item as "skipped" as the offset comes + // _after_ the filter. + item := iterator.iterator.Item() + + matches := true + check := func(value []byte) error { + e := dsq.Entry{ + Key: string(item.Key()), + Value: value, + Size: int(item.ValueSize()), // this function is basically free + } + + // Only calculate expirations if we need them. + if iterator.query.ReturnExpirations { + e.Expiration = expires(item) + } + matches = filter(iterator.query.Filters, e) + return nil + } + + // Maybe check with the value, only if we need it. + var err error + if iterator.query.KeysOnly { + err = check(nil) + } else { + err = item.Value(check) + } + + if err != nil { + select { + case iterator.resultsBuilder.Output <- dsq.Result{Error: err}: + case <-iterator.txn.ds.closing: // datastore closing. + iterator.closedEarly = true + return + case <-worker.Closing(): // client told us to close early + return + } + } + + if !matches { + iterator.skipped++ + } + } +} + +func (iterator *BadgerIterator) yieldResults(prefix []byte, worker goprocess.Process) { + //iterator.iterator.Next() + fmt.Println("prefix") + fmt.Println(string(prefix)) + for _ = 0; iterator.query.Limit <= 0 || iterator.sent < iterator.query.Limit; iterator.iterator.Next() { + if !iterator.iterator.Valid() { + fmt.Println("invalid") + //fmt.Println(iterator.iterator.Item()) //item is nil + } + if !iterator.iterator.ValidForPrefix(prefix) { + fmt.Println("invalid for prefix") + return + } + item := iterator.iterator.Item() + e := dsq.Entry{Key: string(item.Key())} + + // Maybe get the value + var result dsq.Result + if !iterator.query.KeysOnly { + fmt.Println("yielding") + b, err := item.ValueCopy(nil) + if err != nil { + fmt.Println(err) + result = dsq.Result{Error: err} + } else { + e.Value = b + e.Size = len(b) + result = dsq.Result{Entry: e} + fmt.Println(e) + } + } else { + e.Size = int(item.ValueSize()) + result = dsq.Result{Entry: e} + } + + if iterator.query.ReturnExpirations { + result.Expiration = expires(item) + } + + // Finally, filter it (unless we're dealing with an error). + if result.Error == nil && filter(iterator.query.Filters, e) { + fmt.Println("filtered") + continue + } + + select { + case iterator.resultsBuilder.Output <- result: + iterator.sent++ + case <-iterator.txn.ds.closing: // datastore closing. + iterator.closedEarly = true + fmt.Println("closing early") + return + case <-worker.Closing(): // client told us to close early + fmt.Println("closing") + return + } + } + fmt.Println("yieldResults done") +}