diff --git a/client/core.go b/client/core.go index 24ab4ea576..18440733d7 100644 --- a/client/core.go +++ b/client/core.go @@ -20,7 +20,7 @@ import ( blocks "github.com/ipfs/go-block-format" cid "github.com/ipfs/go-cid" - ds "github.com/ipfs/go-datastore" + badgerds "github.com/sourcenetwork/defradb/datastores/badger/v3" ) type DB interface { @@ -37,7 +37,7 @@ type DB interface { type Sequence interface{} type Txn interface { - ds.Txn + badgerds.IterableTxn core.MultiStore Systemstore() core.DSReaderWriter IsBatch() bool diff --git a/core/txn.go b/core/txn.go index 0edf98a2f1..886c9d4df3 100644 --- a/core/txn.go +++ b/core/txn.go @@ -10,12 +10,13 @@ package core import ( - ds "github.com/ipfs/go-datastore" + //ds "github.com/ipfs/go-datastore" + ds "github.com/sourcenetwork/defradb/datastores/badger/v3" ) // Txn is a common interface to the db.Txn struct type Txn interface { - ds.Txn + ds.IterableTxn MultiStore Systemstore() DSReaderWriter } diff --git a/datastores/badger/v3/datastore.go b/datastores/badger/v3/datastore.go index ac414da230..fbcde489c3 100644 --- a/datastores/badger/v3/datastore.go +++ b/datastores/badger/v3/datastore.go @@ -224,6 +224,16 @@ func (d *Datastore) newImplicitTransaction(readOnly bool) *txn { return &txn{d, d.DB.NewTransaction(!readOnly), true} } +func (d *Datastore) NewIterableTransaction(ctx context.Context, readOnly bool) (IterableTxn, error) { + d.closeLk.RLock() + defer d.closeLk.RUnlock() + if d.closed { + return nil, ErrClosed + } + + return &txn{d, d.DB.NewTransaction(!readOnly), false}, nil +} + func (d *Datastore) Put(ctx context.Context, key ds.Key, value []byte) error { d.closeLk.RLock() defer d.closeLk.RUnlock() diff --git a/datastores/badger/v3/iterator.go b/datastores/badger/v3/iterator.go index 74921b1d9c..205a27eaf2 100644 --- a/datastores/badger/v3/iterator.go +++ b/datastores/badger/v3/iterator.go @@ -2,6 +2,7 @@ package badger // this is quite similar in principle to https://github.com/MikkelHJuul/bIter/blob/main/iterator.go that John linked - maybe just use/wrap that import ( + "context" "fmt" badger "github.com/dgraph-io/badger/v3" @@ -10,6 +11,19 @@ import ( goprocess "github.com/jbenet/goprocess" ) +type IterableTxn interface { + ds.Txn + Iterable +} + +// TxnDatastore is an interface that should be implemented by datastores that +// support iterable transactions. +type IterableTxnDatastore interface { + ds.TxnDatastore + + NewIterableTransaction(ctx context.Context, readOnly bool) (IterableTxn, error) +} + type Iterable interface { GetIterator(q dsq.Query) (Iterator, error) } diff --git a/db/fetcher/fetcher.go b/db/fetcher/fetcher.go index 29ea3541b6..3bf1061736 100644 --- a/db/fetcher/fetcher.go +++ b/db/fetcher/fetcher.go @@ -13,10 +13,13 @@ import ( "bytes" "context" "errors" + "fmt" "sort" "strings" + ds "github.com/ipfs/go-datastore" dsq "github.com/ipfs/go-datastore/query" + badgerds "github.com/sourcenetwork/defradb/datastores/badger/v3" "github.com/sourcenetwork/defradb/core" "github.com/sourcenetwork/defradb/db/base" @@ -54,10 +57,12 @@ type DocumentFetcher struct { decodedDoc *document.Document initialized bool - kv *core.KeyValue - kvIter dsq.Results - kvEnd bool + kv *core.KeyValue + kvIter badgerds.Iterator + kvResultsIter dsq.Results + kvEnd bool // kvIndex int + priorPrefixes map[ds.Key]struct{} indexKey []byte } @@ -75,6 +80,7 @@ func (df *DocumentFetcher) Init(col *base.CollectionDescription, index *base.Ind df.initialized = true df.doc = new(document.EncodedDocument) df.doc.Schema = &col.Schema + df.priorPrefixes = make(map[ds.Key]struct{}) df.schemaFields = make(map[uint32]base.FieldDescription) for _, field := range col.Schema.Fields { @@ -122,14 +128,29 @@ func (df *DocumentFetcher) Start(ctx context.Context, txn core.Txn, spans core.S } var err error - if df.kvIter != nil { + /*if df.kvIter != nil { don't need to do this any more // If an existing iterator is to be replaced, we must still may sure it is properly closed err = df.kvIter.Close() if err != nil { return err } + }*/ + if df.kvIter == nil { // todo + df.kvIter, err = txn.GetIterator(q) + } else if _, exists := df.priorPrefixes[spans[0].Start().ToDS()]; exists { + df.priorPrefixes = make(map[ds.Key]struct{}) + df.kvIter, err = txn.GetIterator(q) } - df.kvIter, err = txn.Query(ctx, q) + df.priorPrefixes[spans[0].Start().ToDS()] = struct{}{} + + if df.kvResultsIter != nil { + err = df.kvResultsIter.Close() + if err != nil { + return err + } + } + df.kvResultsIter = df.kvIter.IteratePrefix(spans[0].Start().ToDS()) + if err != nil { return err } @@ -197,12 +218,14 @@ func (df *DocumentFetcher) nextKey() (docDone bool, err error) { // - Returns true if the entire iterator/span is exhausted // - Returns a kv pair instead of internally updating func (df *DocumentFetcher) nextKV() (iterDone bool, kv *core.KeyValue, err error) { - res, available := df.kvIter.NextSync() + res, available := df.kvResultsIter.NextSync() if !available { + fmt.Println("!available") return true, nil, nil } err = res.Error if err != nil { + fmt.Println("err") return true, nil, err } @@ -350,5 +373,5 @@ func (df *DocumentFetcher) ReadIndexKey(key core.Key) core.Key { } func (df *DocumentFetcher) Close() error { - return df.kvIter.Close() + return df.kvResultsIter.Close() } diff --git a/db/txn.go b/db/txn.go index f161e7606b..b7f6c56267 100644 --- a/db/txn.go +++ b/db/txn.go @@ -19,6 +19,7 @@ import ( ds "github.com/ipfs/go-datastore" ktds "github.com/ipfs/go-datastore/keytransform" + badgerds "github.com/sourcenetwork/defradb/datastores/badger/v3" ) var ( @@ -38,7 +39,8 @@ var _ client.Txn = (*Txn)(nil) // If the rootstore is a ds.MemoryStore than it'll only have the Batching // semantics. With no Commit/Discord functionality type Txn struct { - ds.Txn + //ds.Txn + badgerds.IterableTxn // wrapped DS systemstore core.DSReaderWriter // wrapped txn /system namespace @@ -60,15 +62,21 @@ func (db *DB) newTxn(ctx context.Context, readonly bool) (*Txn, error) { txn := new(Txn) // check if our datastore natively supports transactions or Batching - txnStore, ok := db.rootstore.(ds.TxnDatastore) - if ok { // we support transactions - dstxn, err := txnStore.NewTransaction(ctx, readonly) + if iterableTxnStore, ok := db.rootstore.(badgerds.IterableTxnDatastore); ok { + dstxn, err := iterableTxnStore.NewIterableTransaction(ctx, readonly) if err != nil { return nil, err } - txn.Txn = dstxn + //txn.Txn = dstxn + txn.IterableTxn = dstxn + } else if txnStore, ok := db.rootstore.(ds.TxnDatastore); ok { // we support transactions + _, err := txnStore.NewTransaction(ctx, readonly) + if err != nil { + return nil, err + } + //txn.IterableTxn = dstxn todo!!! } else if batchStore, ok := db.rootstore.(ds.Batching); ok { // we support Batching batcher, err := batchStore.Batch(ctx) if err != nil { @@ -79,8 +87,9 @@ func (db *DB) newTxn(ctx context.Context, readonly bool) (*Txn, error) { rb := shimBatcherTxn{ Read: batchStore, Batch: batcher, + //Iterable: , todo!! } - txn.Txn = rb + txn.IterableTxn = rb } else { // our datastore supports neither TxnDatastore or Batching // for now return error @@ -89,7 +98,7 @@ func (db *DB) newTxn(ctx context.Context, readonly bool) (*Txn, error) { // add the wrapped datastores using the existing KeyTransform functions from the db // @todo Check if KeyTransforms are nil beforehand - shimStore := shimTxnStore{txn.Txn} + shimStore := shimTxnStore{txn.IterableTxn} txn.systemstore = ktds.Wrap(shimStore, db.ssKeyTransform) txn.datastore = ktds.Wrap(shimStore, db.dsKeyTransform) txn.headstore = ktds.Wrap(shimStore, db.hsKeyTransform) @@ -120,17 +129,17 @@ func (txn *Txn) DAGstore() core.DAGStore { } func (txn *Txn) IsBatch() bool { - _, ok := txn.Txn.(shimBatcherTxn) + _, ok := txn.IterableTxn.(shimBatcherTxn) return ok } // Shim to make ds.Txn support ds.Datastore type shimTxnStore struct { - ds.Txn + badgerds.IterableTxn } func (ts shimTxnStore) Sync(ctx context.Context, prefix ds.Key) error { - return ts.Txn.Commit(ctx) + return ts.IterableTxn.Commit(ctx) } func (ts shimTxnStore) Close() error { @@ -142,6 +151,7 @@ func (ts shimTxnStore) Close() error { type shimBatcherTxn struct { ds.Read ds.Batch + badgerds.Iterable } func (shimBatcherTxn) Discard(_ context.Context) {