Skip to content

Commit

Permalink
WIP - hook up iterable transaction to db system
Browse files Browse the repository at this point in the history
  • Loading branch information
AndrewSisley committed Nov 26, 2021
1 parent a7c5f36 commit a463b6b
Show file tree
Hide file tree
Showing 6 changed files with 79 additions and 21 deletions.
4 changes: 2 additions & 2 deletions client/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import (

blocks "github.com/ipfs/go-block-format"
cid "github.com/ipfs/go-cid"
ds "github.com/ipfs/go-datastore"
badgerds "github.com/sourcenetwork/defradb/datastores/badger/v3"
)

type DB interface {
Expand All @@ -37,7 +37,7 @@ type DB interface {
type Sequence interface{}

type Txn interface {
ds.Txn
badgerds.IterableTxn
core.MultiStore
Systemstore() core.DSReaderWriter
IsBatch() bool
Expand Down
5 changes: 3 additions & 2 deletions core/txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,13 @@
package core

import (
ds "github.com/ipfs/go-datastore"
//ds "github.com/ipfs/go-datastore"
ds "github.com/sourcenetwork/defradb/datastores/badger/v3"
)

// Txn is a common interface to the db.Txn struct
type Txn interface {
ds.Txn
ds.IterableTxn
MultiStore
Systemstore() DSReaderWriter
}
10 changes: 10 additions & 0 deletions datastores/badger/v3/datastore.go
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,16 @@ func (d *Datastore) newImplicitTransaction(readOnly bool) *txn {
return &txn{d, d.DB.NewTransaction(!readOnly), true}
}

func (d *Datastore) NewIterableTransaction(ctx context.Context, readOnly bool) (IterableTxn, error) {
d.closeLk.RLock()
defer d.closeLk.RUnlock()
if d.closed {
return nil, ErrClosed
}

return &txn{d, d.DB.NewTransaction(!readOnly), false}, nil
}

func (d *Datastore) Put(ctx context.Context, key ds.Key, value []byte) error {
d.closeLk.RLock()
defer d.closeLk.RUnlock()
Expand Down
14 changes: 14 additions & 0 deletions datastores/badger/v3/iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package badger

// this is quite similar in principle to https://github.com/MikkelHJuul/bIter/blob/main/iterator.go that John linked - maybe just use/wrap that
import (
"context"
"fmt"

badger "github.com/dgraph-io/badger/v3"
Expand All @@ -10,6 +11,19 @@ import (
goprocess "github.com/jbenet/goprocess"
)

type IterableTxn interface {
ds.Txn
Iterable
}

// TxnDatastore is an interface that should be implemented by datastores that
// support iterable transactions.
type IterableTxnDatastore interface {
ds.TxnDatastore

NewIterableTransaction(ctx context.Context, readOnly bool) (IterableTxn, error)
}

type Iterable interface {
GetIterator(q dsq.Query) (Iterator, error)
}
Expand Down
37 changes: 30 additions & 7 deletions db/fetcher/fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,13 @@ import (
"bytes"
"context"
"errors"
"fmt"
"sort"
"strings"

ds "github.com/ipfs/go-datastore"
dsq "github.com/ipfs/go-datastore/query"
badgerds "github.com/sourcenetwork/defradb/datastores/badger/v3"

"github.com/sourcenetwork/defradb/core"
"github.com/sourcenetwork/defradb/db/base"
Expand Down Expand Up @@ -54,10 +57,12 @@ type DocumentFetcher struct {
decodedDoc *document.Document
initialized bool

kv *core.KeyValue
kvIter dsq.Results
kvEnd bool
kv *core.KeyValue
kvIter badgerds.Iterator
kvResultsIter dsq.Results
kvEnd bool
// kvIndex int
priorPrefixes map[ds.Key]struct{}

indexKey []byte
}
Expand All @@ -75,6 +80,7 @@ func (df *DocumentFetcher) Init(col *base.CollectionDescription, index *base.Ind
df.initialized = true
df.doc = new(document.EncodedDocument)
df.doc.Schema = &col.Schema
df.priorPrefixes = make(map[ds.Key]struct{})

df.schemaFields = make(map[uint32]base.FieldDescription)
for _, field := range col.Schema.Fields {
Expand Down Expand Up @@ -122,14 +128,29 @@ func (df *DocumentFetcher) Start(ctx context.Context, txn core.Txn, spans core.S
}

var err error
if df.kvIter != nil {
/*if df.kvIter != nil { don't need to do this any more
// If an existing iterator is to be replaced, we must still may sure it is properly closed
err = df.kvIter.Close()
if err != nil {
return err
}
}*/
if df.kvIter == nil { // todo
df.kvIter, err = txn.GetIterator(q)
} else if _, exists := df.priorPrefixes[spans[0].Start().ToDS()]; exists {
df.priorPrefixes = make(map[ds.Key]struct{})
df.kvIter, err = txn.GetIterator(q)
}
df.kvIter, err = txn.Query(ctx, q)
df.priorPrefixes[spans[0].Start().ToDS()] = struct{}{}

if df.kvResultsIter != nil {
err = df.kvResultsIter.Close()
if err != nil {
return err
}
}
df.kvResultsIter = df.kvIter.IteratePrefix(spans[0].Start().ToDS())

if err != nil {
return err
}
Expand Down Expand Up @@ -197,12 +218,14 @@ func (df *DocumentFetcher) nextKey() (docDone bool, err error) {
// - Returns true if the entire iterator/span is exhausted
// - Returns a kv pair instead of internally updating
func (df *DocumentFetcher) nextKV() (iterDone bool, kv *core.KeyValue, err error) {
res, available := df.kvIter.NextSync()
res, available := df.kvResultsIter.NextSync()
if !available {
fmt.Println("!available")
return true, nil, nil
}
err = res.Error
if err != nil {
fmt.Println("err")
return true, nil, err
}

Expand Down Expand Up @@ -350,5 +373,5 @@ func (df *DocumentFetcher) ReadIndexKey(key core.Key) core.Key {
}

func (df *DocumentFetcher) Close() error {
return df.kvIter.Close()
return df.kvResultsIter.Close()
}
30 changes: 20 additions & 10 deletions db/txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (

ds "github.com/ipfs/go-datastore"
ktds "github.com/ipfs/go-datastore/keytransform"
badgerds "github.com/sourcenetwork/defradb/datastores/badger/v3"
)

var (
Expand All @@ -38,7 +39,8 @@ var _ client.Txn = (*Txn)(nil)
// If the rootstore is a ds.MemoryStore than it'll only have the Batching
// semantics. With no Commit/Discord functionality
type Txn struct {
ds.Txn
//ds.Txn
badgerds.IterableTxn

// wrapped DS
systemstore core.DSReaderWriter // wrapped txn /system namespace
Expand All @@ -60,15 +62,21 @@ func (db *DB) newTxn(ctx context.Context, readonly bool) (*Txn, error) {
txn := new(Txn)

// check if our datastore natively supports transactions or Batching
txnStore, ok := db.rootstore.(ds.TxnDatastore)
if ok { // we support transactions
dstxn, err := txnStore.NewTransaction(ctx, readonly)
if iterableTxnStore, ok := db.rootstore.(badgerds.IterableTxnDatastore); ok {
dstxn, err := iterableTxnStore.NewIterableTransaction(ctx, readonly)
if err != nil {
return nil, err
}

txn.Txn = dstxn
//txn.Txn = dstxn
txn.IterableTxn = dstxn
} else if txnStore, ok := db.rootstore.(ds.TxnDatastore); ok { // we support transactions
_, err := txnStore.NewTransaction(ctx, readonly)
if err != nil {
return nil, err
}

//txn.IterableTxn = dstxn todo!!!
} else if batchStore, ok := db.rootstore.(ds.Batching); ok { // we support Batching
batcher, err := batchStore.Batch(ctx)
if err != nil {
Expand All @@ -79,8 +87,9 @@ func (db *DB) newTxn(ctx context.Context, readonly bool) (*Txn, error) {
rb := shimBatcherTxn{
Read: batchStore,
Batch: batcher,
//Iterable: , todo!!
}
txn.Txn = rb
txn.IterableTxn = rb
} else {
// our datastore supports neither TxnDatastore or Batching
// for now return error
Expand All @@ -89,7 +98,7 @@ func (db *DB) newTxn(ctx context.Context, readonly bool) (*Txn, error) {

// add the wrapped datastores using the existing KeyTransform functions from the db
// @todo Check if KeyTransforms are nil beforehand
shimStore := shimTxnStore{txn.Txn}
shimStore := shimTxnStore{txn.IterableTxn}
txn.systemstore = ktds.Wrap(shimStore, db.ssKeyTransform)
txn.datastore = ktds.Wrap(shimStore, db.dsKeyTransform)
txn.headstore = ktds.Wrap(shimStore, db.hsKeyTransform)
Expand Down Expand Up @@ -120,17 +129,17 @@ func (txn *Txn) DAGstore() core.DAGStore {
}

func (txn *Txn) IsBatch() bool {
_, ok := txn.Txn.(shimBatcherTxn)
_, ok := txn.IterableTxn.(shimBatcherTxn)
return ok
}

// Shim to make ds.Txn support ds.Datastore
type shimTxnStore struct {
ds.Txn
badgerds.IterableTxn
}

func (ts shimTxnStore) Sync(ctx context.Context, prefix ds.Key) error {
return ts.Txn.Commit(ctx)
return ts.IterableTxn.Commit(ctx)
}

func (ts shimTxnStore) Close() error {
Expand All @@ -142,6 +151,7 @@ func (ts shimTxnStore) Close() error {
type shimBatcherTxn struct {
ds.Read
ds.Batch
badgerds.Iterable
}

func (shimBatcherTxn) Discard(_ context.Context) {
Expand Down

0 comments on commit a463b6b

Please sign in to comment.