diff --git a/bindings/go/src/_stacktester/directory.go b/bindings/go/src/_stacktester/directory.go index 291e7a000d6..d80da235018 100644 --- a/bindings/go/src/_stacktester/directory.go +++ b/bindings/go/src/_stacktester/directory.go @@ -22,6 +22,7 @@ package main import ( "bytes" + "context" "strings" "github.com/apple/foundationdb/bindings/go/src/fdb" @@ -105,7 +106,7 @@ var createOps = map[string]bool{ "OPEN_SUBSPACE": true, } -func (de *DirectoryExtension) processOp(sm *StackMachine, op string, isDB bool, idx int, t fdb.Transactor, rt fdb.ReadTransactor) { +func (de *DirectoryExtension) processOp(ctx context.Context, sm *StackMachine, op string, isDB bool, idx int, t fdb.Transactor, rt fdb.ReadTransactor) { defer func() { if r := recover(); r != nil { sm.store(idx, []byte("DIRECTORY_ERROR")) @@ -142,7 +143,7 @@ func (de *DirectoryExtension) processOp(sm *StackMachine, op string, isDB bool, if l != nil { layer = l.([]byte) } - d, err := de.cwd().CreateOrOpen(t, tupleToPath(tuples[0]), layer) + d, err := de.cwd().CreateOrOpen(ctx, t, tupleToPath(tuples[0]), layer) if err != nil { panic(err) } @@ -157,10 +158,10 @@ func (de *DirectoryExtension) processOp(sm *StackMachine, op string, isDB bool, p := sm.waitAndPop().item var d directory.Directory if p == nil { - d, err = de.cwd().Create(t, tupleToPath(tuples[0]), layer) + d, err = de.cwd().Create(ctx, t, tupleToPath(tuples[0]), layer) } else { // p.([]byte) itself may be nil, but CreatePrefix handles that appropriately - d, err = de.cwd().CreatePrefix(t, tupleToPath(tuples[0]), layer, p.([]byte)) + d, err = de.cwd().CreatePrefix(ctx, t, tupleToPath(tuples[0]), layer, p.([]byte)) } if err != nil { panic(err) @@ -173,7 +174,7 @@ func (de *DirectoryExtension) processOp(sm *StackMachine, op string, isDB bool, if l != nil { layer = l.([]byte) } - d, err := de.cwd().Open(rt, tupleToPath(tuples[0]), layer) + d, err := de.cwd().Open(ctx, rt, tupleToPath(tuples[0]), layer) if err != nil { panic(err) } @@ -188,14 +189,14 @@ func (de *DirectoryExtension) processOp(sm *StackMachine, op string, isDB bool, de.errorIndex = sm.waitAndPop().item.(int64) case op == "MOVE": tuples := sm.popTuples(2) - d, err := de.cwd().Move(t, tupleToPath(tuples[0]), tupleToPath(tuples[1])) + d, err := de.cwd().Move(ctx, t, tupleToPath(tuples[0]), tupleToPath(tuples[1])) if err != nil { panic(err) } de.store(d) case op == "MOVE_TO": tuples := sm.popTuples(1) - d, err := de.cwd().MoveTo(t, tupleToPath(tuples[0])) + d, err := de.cwd().MoveTo(ctx, t, tupleToPath(tuples[0])) if err != nil { panic(err) } @@ -208,8 +209,8 @@ func (de *DirectoryExtension) processOp(sm *StackMachine, op string, isDB bool, // doesn't end up committing the version key. (Other languages have // separate remove() and remove_if_exists() so don't have this tricky // issue). - _, err := t.Transact(func(tr fdb.Transaction) (interface{}, error) { - ok, err := de.cwd().Remove(tr, path) + _, txClose, err := t.Transact(ctx, func(tr fdb.Transaction) (interface{}, error) { + ok, err := de.cwd().Remove(ctx, tr, path) if err != nil { panic(err) } @@ -225,8 +226,9 @@ func (de *DirectoryExtension) processOp(sm *StackMachine, op string, isDB bool, if err != nil { panic(err) } + txClose() case op == "LIST": - subs, err := de.cwd().List(rt, sm.maybePath()) + subs, err := de.cwd().List(ctx, rt, sm.maybePath()) if err != nil { panic(err) } @@ -236,7 +238,7 @@ func (de *DirectoryExtension) processOp(sm *StackMachine, op string, isDB bool, } sm.store(idx, t.Pack()) case op == "EXISTS": - b, err := de.cwd().Exists(rt, sm.maybePath()) + b, err := de.cwd().Exists(ctx, rt, sm.maybePath()) if err != nil { panic(err) } @@ -275,10 +277,14 @@ func (de *DirectoryExtension) processOp(sm *StackMachine, op string, isDB bool, k := sm.waitAndPop().item.([]byte) k = append(k, tuple.Tuple{de.index}.Pack()...) v := de.css().Bytes() - t.Transact(func(tr fdb.Transaction) (interface{}, error) { + _, txClose, err := t.Transact(ctx, func(tr fdb.Transaction) (interface{}, error) { tr.Set(fdb.Key(k), v) return nil, nil }) + if err != nil { + panic(err) + } + txClose() case op == "LOG_DIRECTORY": rp := sm.waitAndPop().item.([]byte) ss := subspace.FromBytes(rp).Sub(de.index) @@ -288,7 +294,7 @@ func (de *DirectoryExtension) processOp(sm *StackMachine, op string, isDB bool, v2 := tuple.Tuple{de.cwd().GetLayer()}.Pack() k3 := ss.Pack(tuple.Tuple{"exists"}) var v3 []byte - exists, err := de.cwd().Exists(rt, nil) + exists, err := de.cwd().Exists(ctx, rt, nil) if err != nil { panic(err) } @@ -300,19 +306,23 @@ func (de *DirectoryExtension) processOp(sm *StackMachine, op string, isDB bool, k4 := ss.Pack(tuple.Tuple{"children"}) var subs []string if exists { - subs, err = de.cwd().List(rt, nil) + subs, err = de.cwd().List(ctx, rt, nil) if err != nil { panic(err) } } v4 := tuplePackStrings(subs) - t.Transact(func(tr fdb.Transaction) (interface{}, error) { + _, txClose, err := t.Transact(ctx, func(tr fdb.Transaction) (interface{}, error) { tr.Set(k1, v1) tr.Set(k2, v2) tr.Set(k3, v3) tr.Set(k4, v4) return nil, nil }) + if err != nil { + panic(err) + } + txClose() case op == "STRIP_PREFIX": ba := sm.waitAndPop().item.([]byte) ssb := de.css().Bytes() diff --git a/bindings/go/src/_stacktester/stacktester.go b/bindings/go/src/_stacktester/stacktester.go index 989eca600d9..3ac02a4bdf0 100644 --- a/bindings/go/src/_stacktester/stacktester.go +++ b/bindings/go/src/_stacktester/stacktester.go @@ -22,6 +22,7 @@ package main import ( "bytes" + "context" "encoding/binary" "encoding/hex" "fmt" @@ -242,11 +243,12 @@ func (sm *StackMachine) dumpStack() { } } -func (sm *StackMachine) executeMutation(t fdb.Transactor, f func(fdb.Transaction) (interface{}, error), isDB bool, idx int) { - _, err := t.Transact(f) +func (sm *StackMachine) executeMutation(ctx context.Context, t fdb.Transactor, f func(fdb.Transaction) (interface{}, error), isDB bool, idx int) { + _, txClose, err := t.Transact(ctx, f) if err != nil { panic(err) } + txClose() if isDB { sm.store(idx, []byte("RESULT_NOT_PRESENT")) } @@ -277,9 +279,9 @@ func (sm *StackMachine) checkWatches(watches [4]fdb.FutureNil, expected bool) bo return true } -func (sm *StackMachine) testWatches() { +func (sm *StackMachine) testWatches(ctx context.Context) { for { - _, err := db.Transact(func(tr fdb.Transaction) (interface{}, error) { + _, txClose, err := db.Transact(ctx, func(tr fdb.Transaction) (interface{}, error) { tr.Set(fdb.Key("w0"), []byte("0")) tr.Set(fdb.Key("w2"), []byte("2")) tr.Set(fdb.Key("w3"), []byte("3")) @@ -288,10 +290,11 @@ func (sm *StackMachine) testWatches() { if err != nil { panic(err) } + txClose() var watches [4]fdb.FutureNil - _, err = db.Transact(func(tr fdb.Transaction) (interface{}, error) { + _, txClose, err = db.Transact(ctx, func(tr fdb.Transaction) (interface{}, error) { watches[0] = tr.Watch(fdb.Key("w0")) watches[1] = tr.Watch(fdb.Key("w1")) watches[2] = tr.Watch(fdb.Key("w2")) @@ -304,6 +307,7 @@ func (sm *StackMachine) testWatches() { if err != nil { panic(err) } + defer txClose() time.Sleep(5 * time.Second) @@ -311,7 +315,7 @@ func (sm *StackMachine) testWatches() { continue } - _, err = db.Transact(func(tr fdb.Transaction) (interface{}, error) { + _, txClose, err = db.Transact(ctx, func(tr fdb.Transaction) (interface{}, error) { tr.Set(fdb.Key("w0"), []byte("a")) tr.Set(fdb.Key("w1"), []byte("b")) tr.Clear(fdb.Key("w2")) @@ -321,6 +325,7 @@ func (sm *StackMachine) testWatches() { if err != nil { panic(err) } + defer txClose() if sm.checkWatches(watches, true) { return @@ -328,8 +333,8 @@ func (sm *StackMachine) testWatches() { } } -func (sm *StackMachine) testLocality() { - _, err := db.Transact(func(tr fdb.Transaction) (interface{}, error) { +func (sm *StackMachine) testLocality(ctx context.Context) { + _, txClose, err := db.Transact(ctx, func(tr fdb.Transaction) (interface{}, error) { tr.Options().SetTimeout(60 * 1000) tr.Options().SetReadSystemKeys() boundaryKeys, err := db.LocalityGetBoundaryKeys(fdb.KeyRange{fdb.Key(""), fdb.Key("\xff\xff")}, 0, 0) @@ -364,10 +369,11 @@ func (sm *StackMachine) testLocality() { if err != nil { panic(err) } + txClose() } -func (sm *StackMachine) logStack(entries map[int]stackEntry, prefix []byte) { - _, err := db.Transact(func(tr fdb.Transaction) (interface{}, error) { +func (sm *StackMachine) logStack(ctx context.Context, entries map[int]stackEntry, prefix []byte) { + _, txClose, err := db.Transact(ctx, func(tr fdb.Transaction) (interface{}, error) { for index, el := range entries { var keyt tuple.Tuple keyt = append(keyt, int64(index)) @@ -388,10 +394,11 @@ func (sm *StackMachine) logStack(entries map[int]stackEntry, prefix []byte) { return nil, nil }) - if err != nil { panic(err) } + txClose() + return } @@ -436,7 +443,7 @@ func (sm *StackMachine) switchTransaction(name []byte) { } } -func (sm *StackMachine) processInst(idx int, inst tuple.Tuple) { +func (sm *StackMachine) processInst(ctx context.Context, idx int, inst tuple.Tuple) { defer func() { if r := recover(); r != nil { switch r := r.(type) { @@ -531,7 +538,7 @@ func (sm *StackMachine) processInst(idx int, inst tuple.Tuple) { case op == "ON_ERROR": sm.store(idx, sm.currentTransaction().OnError(fdb.Error{int(sm.waitAndPop().item.(int64))})) case op == "GET_READ_VERSION": - _, err = rt.ReadTransact(func(rtr fdb.ReadTransaction) (interface{}, error) { + _, txClose, err := rt.ReadTransact(ctx, func(rtr fdb.ReadTransaction) (interface{}, error) { sm.lastVersion = rtr.GetReadVersion().MustGet() sm.store(idx, []byte("GOT_READ_VERSION")) return nil, nil @@ -539,10 +546,11 @@ func (sm *StackMachine) processInst(idx int, inst tuple.Tuple) { if err != nil { panic(err) } + txClose() case op == "SET": key := fdb.Key(sm.waitAndPop().item.([]byte)) value := sm.waitAndPop().item.([]byte) - sm.executeMutation(t, func(tr fdb.Transaction) (interface{}, error) { + sm.executeMutation(ctx, t, func(tr fdb.Transaction) (interface{}, error) { tr.Set(key, value) return nil, nil }, isDB, idx) @@ -553,25 +561,26 @@ func (sm *StackMachine) processInst(idx int, inst tuple.Tuple) { for len(sm.stack) > 0 { entries[len(sm.stack)-1] = sm.waitAndPop() if len(entries) == 100 { - sm.logStack(entries, prefix) + sm.logStack(ctx, entries, prefix) entries = make(map[int]stackEntry) } } - sm.logStack(entries, prefix) + sm.logStack(ctx, entries, prefix) case op == "GET": key := fdb.Key(sm.waitAndPop().item.([]byte)) - res, err := rt.ReadTransact(func(rtr fdb.ReadTransaction) (interface{}, error) { + res, txClose, err := rt.ReadTransact(ctx, func(rtr fdb.ReadTransaction) (interface{}, error) { return rtr.Get(key), nil }) if err != nil { panic(err) } + txClose() sm.store(idx, res.(fdb.FutureByteSlice)) case op == "GET_ESTIMATED_RANGE_SIZE": r := sm.popKeyRange() - _, err := rt.ReadTransact(func(rtr fdb.ReadTransaction) (interface{}, error) { + _, txClose, err := rt.ReadTransact(ctx, func(rtr fdb.ReadTransaction) (interface{}, error) { _ = rtr.GetEstimatedRangeSizeBytes(r).MustGet() sm.store(idx, []byte("GOT_ESTIMATED_RANGE_SIZE")) return nil, nil @@ -579,10 +588,11 @@ func (sm *StackMachine) processInst(idx int, inst tuple.Tuple) { if err != nil { panic(err) } + txClose() case op == "GET_RANGE_SPLIT_POINTS": r := sm.popKeyRange() chunkSize := sm.waitAndPop().item.(int64) - _, err := rt.ReadTransact(func(rtr fdb.ReadTransaction) (interface{}, error) { + _, txClose, err := rt.ReadTransact(ctx, func(rtr fdb.ReadTransaction) (interface{}, error) { _ = rtr.GetRangeSplitPoints(r, chunkSize).MustGet() sm.store(idx, []byte("GOT_RANGE_SPLIT_POINTS")) return nil, nil @@ -590,13 +600,14 @@ func (sm *StackMachine) processInst(idx int, inst tuple.Tuple) { if err != nil { panic(err) } + txClose() case op == "COMMIT": sm.store(idx, sm.currentTransaction().Commit()) case op == "RESET": sm.currentTransaction().Reset() case op == "CLEAR": key := fdb.Key(sm.waitAndPop().item.([]byte)) - sm.executeMutation(t, func(tr fdb.Transaction) (interface{}, error) { + sm.executeMutation(ctx, t, func(tr fdb.Transaction) (interface{}, error) { tr.Clear(key) return nil, nil }, isDB, idx) @@ -619,12 +630,13 @@ func (sm *StackMachine) processInst(idx int, inst tuple.Tuple) { case op == "GET_KEY": sel := sm.popSelector() prefix := sm.waitAndPop().item.([]byte) - res, err := rt.ReadTransact(func(rtr fdb.ReadTransaction) (interface{}, error) { + res, txClose, err := rt.ReadTransact(ctx, func(rtr fdb.ReadTransaction) (interface{}, error) { return rtr.GetKey(sel).MustGet(), nil }) if err != nil { panic(err) } + txClose() key := res.(fdb.Key) @@ -657,12 +669,13 @@ func (sm *StackMachine) processInst(idx int, inst tuple.Tuple) { prefix = sm.waitAndPop().item.([]byte) } - res, err := rt.ReadTransact(func(rtr fdb.ReadTransaction) (interface{}, error) { - return rtr.GetRange(r, ro).GetSliceOrPanic(), nil + res, txClose, err := rt.ReadTransact(ctx, func(rtr fdb.ReadTransaction) (interface{}, error) { + return rtr.GetRange(r, ro).MustGet(), nil }) if err != nil { panic(err) } + txClose() sm.pushRange(idx, res.([]fdb.KeyValue), prefix) case strings.HasPrefix(op, "CLEAR_RANGE"): @@ -675,7 +688,7 @@ func (sm *StackMachine) processInst(idx int, inst tuple.Tuple) { er = sm.popKeyRange() } - sm.executeMutation(t, func(tr fdb.Transaction) (interface{}, error) { + sm.executeMutation(ctx, t, func(tr fdb.Transaction) (interface{}, error) { tr.ClearRange(er) return nil, nil }, isDB, idx) @@ -758,7 +771,7 @@ func (sm *StackMachine) processInst(idx int, inst tuple.Tuple) { newsm := newStackMachine(sm.waitAndPop().item.([]byte), verbose) sm.threads.Add(1) go func() { - newsm.Run() + newsm.Run(ctx) sm.threads.Done() }() case op == "WAIT_EMPTY": @@ -767,13 +780,17 @@ func (sm *StackMachine) processInst(idx int, inst tuple.Tuple) { if err != nil { panic(err) } - db.Transact(func(tr fdb.Transaction) (interface{}, error) { - v := tr.GetRange(er, fdb.RangeOptions{}).GetSliceOrPanic() + _, txClose, err := db.Transact(ctx, func(tr fdb.Transaction) (interface{}, error) { + v := tr.GetRange(er, fdb.RangeOptions{}).MustGet() if len(v) != 0 { panic(fdb.Error{1020}) } return nil, nil }) + if err != nil { + panic(err) + } + txClose() sm.store(idx, []byte("WAITED_FOR_EMPTY")) case op == "READ_CONFLICT_RANGE": err = sm.currentTransaction().AddReadConflictRange(fdb.KeyRange{fdb.Key(sm.waitAndPop().item.([]byte)), fdb.Key(sm.waitAndPop().item.([]byte))}) @@ -804,7 +821,7 @@ func (sm *StackMachine) processInst(idx int, inst tuple.Tuple) { key := fdb.Key(sm.waitAndPop().item.([]byte)) ival := sm.waitAndPop().item value := ival.([]byte) - sm.executeMutation(t, func(tr fdb.Transaction) (interface{}, error) { + sm.executeMutation(ctx, t, func(tr fdb.Transaction) (interface{}, error) { reflect.ValueOf(tr).MethodByName(opname).Call([]reflect.Value{reflect.ValueOf(key), reflect.ValueOf(value)}) return nil, nil }, isDB, idx) @@ -855,7 +872,7 @@ func (sm *StackMachine) processInst(idx int, inst tuple.Tuple) { } fdb.MustAPIVersion(apiVersion) - _, err := db.Transact(func(tr fdb.Transaction) (interface{}, error) { + _, txClose, err := db.Transact(ctx, func(tr fdb.Transaction) (interface{}, error) { tr.Options().SetPrioritySystemImmediate() tr.Options().SetPriorityBatch() tr.Options().SetCausalReadRisky() @@ -876,16 +893,16 @@ func (sm *StackMachine) processInst(idx int, inst tuple.Tuple) { return tr.Get(fdb.Key("\xff")).MustGet(), nil }) - if err != nil { panic(err) } + txClose() - sm.testWatches() - sm.testLocality() + sm.testWatches(ctx) + sm.testLocality(ctx) case strings.HasPrefix(op, "DIRECTORY_"): - sm.de.processOp(sm, op[10:], isDB, idx, t, rt) + sm.de.processOp(ctx, sm, op[10:], isDB, idx, t, rt) default: log.Fatalf("Unhandled operation %s\n", string(inst[0].([]byte))) } @@ -899,13 +916,14 @@ func (sm *StackMachine) processInst(idx int, inst tuple.Tuple) { runtime.Gosched() } -func (sm *StackMachine) Run() { - r, err := db.Transact(func(tr fdb.Transaction) (interface{}, error) { - return tr.GetRange(tuple.Tuple{sm.prefix}, fdb.RangeOptions{}).GetSliceOrPanic(), nil +func (sm *StackMachine) Run(ctx context.Context) { + r, txClose, err := db.Transact(ctx, func(tr fdb.Transaction) (interface{}, error) { + return tr.GetRange(tuple.Tuple{sm.prefix}, fdb.RangeOptions{}).MustGet(), nil }) if err != nil { panic(err) } + txClose() instructions := r.([]fdb.KeyValue) @@ -915,7 +933,7 @@ func (sm *StackMachine) Run() { if sm.verbose { fmt.Printf("Instruction %d\n", i) } - sm.processInst(i, inst) + sm.processInst(ctx, i, inst) } sm.threads.Wait() @@ -956,7 +974,9 @@ func main() { log.Fatal(err) } + ctx := context.Background() + sm := newStackMachine(prefix, verbose) - sm.Run() + sm.Run(ctx) } diff --git a/bindings/go/src/fdb/database.go b/bindings/go/src/fdb/database.go index b30e93e6c85..c2ab339d766 100644 --- a/bindings/go/src/fdb/database.go +++ b/bindings/go/src/fdb/database.go @@ -27,8 +27,8 @@ package fdb import "C" import ( + "context" "errors" - "runtime" ) // ErrMultiVersionClientUnavailable is returned when the multi-version client API is unavailable. @@ -92,6 +92,7 @@ func (opt DatabaseOptions) setOpt(code int, param []byte) error { // preferable to use the (Database).Transact method, which handles // automatically creating and committing a transaction with appropriate retry // behavior. +// Close() must be called on the returned transaction to avoid a memory leak. func (d Database) CreateTransaction() (Transaction, error) { var outt *C.FDBTransaction @@ -100,10 +101,6 @@ func (d Database) CreateTransaction() (Transaction, error) { } t := &transaction{outt, d} - // transactions cannot be destroyed explicitly if any future is still potentially used - // thus the GC is used to figure out when all Go wrapper objects for futures have gone out of scope, - // making the transaction ready to be garbage-collected. - runtime.SetFinalizer(t, (*transaction).destroy) return Transaction{t}, nil } @@ -115,22 +112,22 @@ func (d Database) CreateTransaction() (Transaction, error) { // process address is the form of IP:Port pair without the :tls suffix if the cluster is running // with TLS enabled. The address can also be multiple processes addresses concated by a comma, e.g. // "IP1:Port,IP2:port", in this case the RebootWorker will reboot all provided addresses concurrently. -func (d Database) RebootWorker(address string, checkFile bool, suspendDuration int) error { +func (d Database) RebootWorker(ctx context.Context, address string, checkFile bool, suspendDuration int) error { + f := newFuture(C.fdb_database_reboot_worker( + d.ptr, + byteSliceToPtr([]byte(address)), + C.int(len(address)), + C.fdb_bool_t(boolToInt(checkFile)), + C.int(suspendDuration), + ), + ) + defer f.Close() + t := &futureInt64{ - future: newFutureWithDb( - d.database, - nil, - C.fdb_database_reboot_worker( - d.ptr, - byteSliceToPtr([]byte(address)), - C.int(len(address)), - C.fdb_bool_t(boolToInt(checkFile)), - C.int(suspendDuration), - ), - ), + future: f, } - dbVersion, err := t.Get() + dbVersion, err := t.Get(ctx) if dbVersion == 0 { return errors.New("failed to send reboot process request") @@ -142,16 +139,19 @@ func (d Database) RebootWorker(address string, checkFile bool, suspendDuration i // GetClientStatus returns a JSON byte slice containing database client-side status information. // At the top level the report describes the status of the Multi-Version Client database - its initialization state, the protocol version, the available client versions; it also embeds the status of the actual version-specific database and within it the addresses of various FDB server roles the client is aware of and their connection status. // NOTE: ErrMultiVersionClientUnavailable will be returned if the Multi-Version client API was not enabled. -func (d Database) GetClientStatus() ([]byte, error) { +func (d Database) GetClientStatus(ctx context.Context) ([]byte, error) { if apiVersion == 0 { return nil, errAPIVersionUnset } + f := newFuture(C.fdb_database_get_client_status(d.ptr)) + defer f.Close() + st := &futureByteSlice{ - future: newFutureWithDb(d.database, nil, C.fdb_database_get_client_status(d.ptr)), + future: f, } - b, err := st.Get() + b, err := st.Get(ctx) if err != nil { return nil, err } @@ -162,20 +162,52 @@ func (d Database) GetClientStatus() ([]byte, error) { return b, nil } -func retryable(wrapped func() (interface{}, error), onError func(Error) FutureNil) (ret interface{}, err error) { +func autoCancel(ctx context.Context, tr CancellableTransaction) chan<- struct{} { + exitCh := make(chan struct{}) + // This goroutine ensures that as soon as context is canceled the FoundationDB transaction will be canceled as well; + // this can in turn cause FoundationDB's `transaction_canceled` errors. + go func() { + select { + case <-exitCh: + // transaction finished before any context cancellation was acknowledged + return + case <-ctx.Done(): + // context cancellation happened before transaction finished + // NOTE: canceling the transaction does not mean that transaction state changes did not happen, + + tr.Cancel() + + return + } + }() + + return exitCh +} + +// retryable is responsible for retrying the wrapped function for as long as the returned error is retryable. +// In case of a non-retryable error, it is returned and transaction is closed. +func retryable(ctx context.Context, tr Transaction, wrapped func() (interface{}, error)) (ret interface{}, txClose func(), err error) { + // NOTE: 'autoCancel' must be called outside of the defer function, so that the goroutine is started + ch := autoCancel(ctx, tr) + defer close(ch) + for { ret, err = wrapped() - - // No error means success! if err == nil { + // No error means success! + // Caller is responsible for closing transaction after finishing using any future created within the transaction + // This return value is always nil in case of errors + txClose = tr.Close + return } - // Check if the error chain contains an - // fdb.Error + // Check if the error chain contains an fdb.Error var ep Error if errors.As(err, &ep) { - processedErr := onError(ep).Get() + f := tr.OnError(ep) + processedErr := f.Get(ctx) + f.Close() var newEp Error if !errors.As(processedErr, &newEp) || newEp.Code != ep.Code { // override original error only if not an Error or code changed @@ -187,6 +219,10 @@ func retryable(wrapped func() (interface{}, error), onError func(Error) FutureNi // If OnError returns an error, then it's not // retryable; otherwise take another pass at things if err != nil { + + // destroy transaction; this will cancel all futures created within it + tr.Close() + return } } @@ -204,36 +240,37 @@ func retryable(wrapped func() (interface{}, error), onError func(Error) FutureNi // error. // // The transaction is retried if the error is or wraps a retryable Error. -// The error is unwrapped. // -// Do not return Future objects from the function provided to Transact. The -// Transaction created by Transact may be finalized at any point after Transact -// returns, resulting in the cancellation of any outstanding -// reads. Additionally, any errors returned or panicked by the Future will no -// longer be able to trigger a retry of the caller-provided function. +// In case of success a transaction close function is returned and caller must +// call it after all futures have been used. +// Any errors returned or panicked by the Future outside of the Transact() call +// will no longer be able to trigger a retry of the caller-provided function. // // See the Transactor interface for an example of using Transact with // Transaction and Database objects. -func (d Database) Transact(f func(Transaction) (interface{}, error)) (interface{}, error) { +func (d Database) Transact(ctx context.Context, f func(Transaction) (interface{}, error)) (interface{}, func(), error) { tr, err := d.CreateTransaction() - // Any error here is non-retryable if err != nil { - return nil, err + // Any error here is non-retryable + return nil, nil, err } wrapped := func() (ret interface{}, err error) { defer panicToError(&err) ret, err = f(tr) - - if err == nil { - err = tr.Commit().Get() + if err != nil { + return } + f := tr.Commit() + err = f.Get(ctx) + f.Close() + return } - return retryable(wrapped, tr.OnError) + return retryable(ctx, tr, wrapped) } // ReadTransact runs a caller-provided function inside a retry loop, providing @@ -247,23 +284,20 @@ func (d Database) Transact(f func(Transaction) (interface{}, error)) (interface{ // transaction or return the error. // // The transaction is retried if the error is or wraps a retryable Error. -// The error is unwrapped. -// Read transactions are never committed and destroyed automatically via GC, -// once all their futures go out of scope. +// Read transactions are never committed. // -// Do not return Future objects from the function provided to ReadTransact. The -// Transaction created by ReadTransact may be finalized at any point after -// ReadTransact returns, resulting in the cancellation of any outstanding -// reads. Additionally, any errors returned or panicked by the Future will no -// longer be able to trigger a retry of the caller-provided function. +// In case of success a transaction close function is returned and caller must +// call it after all futures have been used. +// Any errors returned or panicked by the Future outside of the Transact() call +// will no longer be able to trigger a retry of the caller-provided function. // // See the ReadTransactor interface for an example of using ReadTransact with // Transaction, Snapshot and Database objects. -func (d Database) ReadTransact(f func(ReadTransaction) (interface{}, error)) (interface{}, error) { +func (d Database) ReadTransact(ctx context.Context, f func(ReadTransaction) (interface{}, error)) (interface{}, func(), error) { tr, err := d.CreateTransaction() if err != nil { // Any error here is non-retryable - return nil, err + return nil, nil, err } wrapped := func() (ret interface{}, err error) { @@ -271,13 +305,12 @@ func (d Database) ReadTransact(f func(ReadTransaction) (interface{}, error)) (in ret, err = f(tr) - // read-only transactions are not committed and will be destroyed automatically via GC, - // once all the futures go out of scope + // read-only transactions are not committed and will be destroyed automatically when Transaction's Close() is called return } - return retryable(wrapped, tr.OnError) + return retryable(ctx, tr, wrapped) } // Options returns a DatabaseOptions instance suitable for setting options @@ -297,11 +330,12 @@ func (d Database) Options() DatabaseOptions { // // If readVersion is non-zero, the boundary keys as of readVersion will be // returned. -func (d Database) LocalityGetBoundaryKeys(er ExactRange, limit int, readVersion int64) ([]Key, error) { +func (d Database) LocalityGetBoundaryKeys(ctx context.Context, er ExactRange, limit int, readVersion int64) ([]Key, error) { tr, err := d.CreateTransaction() if err != nil { return nil, err } + defer tr.Close() if readVersion != 0 { tr.SetReadVersion(readVersion) @@ -316,7 +350,7 @@ func (d Database) LocalityGetBoundaryKeys(er ExactRange, limit int, readVersion append(Key("\xFF/keyServers/"), ek.FDBKey()...), } - kvs, err := tr.Snapshot().GetRange(ffer, RangeOptions{Limit: limit}).GetSliceWithError() + kvs, err := tr.Snapshot().GetRange(ffer, RangeOptions{Limit: limit}).Get(ctx) if err != nil { return nil, err } diff --git a/bindings/go/src/fdb/directory/allocator.go b/bindings/go/src/fdb/directory/allocator.go index db7657e3c84..0953ece6e9d 100644 --- a/bindings/go/src/fdb/directory/allocator.go +++ b/bindings/go/src/fdb/directory/allocator.go @@ -24,6 +24,7 @@ package directory import ( "bytes" + "context" "encoding/binary" "math/rand" "sync" @@ -63,10 +64,10 @@ func windowSize(start int64) int64 { return 8192 } -func (hca highContentionAllocator) allocate(tr fdb.Transaction, s subspace.Subspace) (subspace.Subspace, error) { +func (hca highContentionAllocator) allocate(ctx context.Context, tr fdb.Transaction, s subspace.Subspace) (subspace.Subspace, error) { for { rr := tr.Snapshot().GetRange(hca.counters, fdb.RangeOptions{Limit: 1, Reverse: true}) - kvs, err := rr.GetSliceWithError() + kvs, err := rr.Get(ctx) if err != nil { return nil, err } @@ -95,10 +96,11 @@ func (hca highContentionAllocator) allocate(tr fdb.Transaction, s subspace.Subsp // Increment the allocation count for the current window tr.Add(hca.counters.Sub(start), oneBytes) countFuture := tr.Snapshot().Get(hca.counters.Sub(start)) + defer countFuture.Close() allocatorMutex.Unlock() - countStr, err := countFuture.Get() + countStr, err := countFuture.Get(ctx) if err != nil { return nil, err } @@ -134,12 +136,13 @@ func (hca highContentionAllocator) allocate(tr fdb.Transaction, s subspace.Subsp latestCounter := tr.Snapshot().GetRange(hca.counters, fdb.RangeOptions{Limit: 1, Reverse: true}) candidateValue := tr.Get(key) + defer candidateValue.Close() tr.Options().SetNextWriteNoWriteConflictRange() tr.Set(key, []byte("")) allocatorMutex.Unlock() - kvs, err = latestCounter.GetSliceWithError() + kvs, err = latestCounter.Get(ctx) if err != nil { return nil, err } @@ -154,7 +157,7 @@ func (hca highContentionAllocator) allocate(tr fdb.Transaction, s subspace.Subsp } } - v, err := candidateValue.Get() + v, err := candidateValue.Get(ctx) if err != nil { return nil, err } diff --git a/bindings/go/src/fdb/directory/directory.go b/bindings/go/src/fdb/directory/directory.go index 613d4435f18..7e570aa6511 100644 --- a/bindings/go/src/fdb/directory/directory.go +++ b/bindings/go/src/fdb/directory/directory.go @@ -40,6 +40,7 @@ package directory import ( + "context" "errors" "github.com/apple/foundationdb/bindings/go/src/fdb" @@ -79,7 +80,7 @@ type Directory interface { // recorded as the layer; if layer is specified and the directory already // exists, it is compared against the layer specified when the directory was // created, and an error is returned if they differ. - CreateOrOpen(t fdb.Transactor, path []string, layer []byte) (DirectorySubspace, error) + CreateOrOpen(ctx context.Context, t fdb.Transactor, path []string, layer []byte) (DirectorySubspace, error) // Open opens the directory specified by path (relative to this Directory), // and returns the directory and its contents as a DirectorySubspace (or ErrDirNotExists @@ -89,7 +90,7 @@ type Directory interface { // If the byte slice layer is specified, it is compared against the layer // specified when the directory was created, and an error is returned if // they differ. - Open(rt fdb.ReadTransactor, path []string, layer []byte) (DirectorySubspace, error) + Open(ctx context.Context, rt fdb.ReadTransactor, path []string, layer []byte) (DirectorySubspace, error) // Create creates a directory specified by path (relative to this // Directory), and returns the directory and its contents as a @@ -97,7 +98,7 @@ type Directory interface { // // If the byte slice layer is specified, it is recorded as the layer and // will be checked when opening the directory in the future. - Create(t fdb.Transactor, path []string, layer []byte) (DirectorySubspace, error) + Create(ctx context.Context, t fdb.Transactor, path []string, layer []byte) (DirectorySubspace, error) // CreatePrefix behaves like Create, but uses a manually specified byte // slice prefix to physically store the contents of this directory, rather @@ -106,7 +107,7 @@ type Directory interface { // If this Directory was created in a root directory that does not allow // manual prefixes, CreatePrefix will return an error. The default root // directory does not allow manual prefixes. - CreatePrefix(t fdb.Transactor, path []string, layer []byte, prefix []byte) (DirectorySubspace, error) + CreatePrefix(ctx context.Context, t fdb.Transactor, path []string, layer []byte, prefix []byte) (DirectorySubspace, error) // Move moves the directory at oldPath to newPath (both relative to this // Directory), and returns the directory (at its new location) and its @@ -116,7 +117,7 @@ type Directory interface { // // There is no effect on the physical prefix of the given directory or on // clients that already have the directory open. - Move(t fdb.Transactor, oldPath []string, newPath []string) (DirectorySubspace, error) + Move(ctx context.Context, t fdb.Transactor, oldPath []string, newPath []string) (DirectorySubspace, error) // MoveTo moves this directory to newAbsolutePath (relative to the root // directory of this Directory), and returns the directory (at its new @@ -126,7 +127,7 @@ type Directory interface { // // There is no effect on the physical prefix of the given directory or on // clients that already have the directory open. - MoveTo(t fdb.Transactor, newAbsolutePath []string) (DirectorySubspace, error) + MoveTo(ctx context.Context, t fdb.Transactor, newAbsolutePath []string) (DirectorySubspace, error) // Remove removes the directory at path (relative to this Directory), its // content, and all subdirectories. Remove returns true if a directory @@ -135,16 +136,16 @@ type Directory interface { // // Note that clients that have already opened this directory might still // insert data into its contents after removal. - Remove(t fdb.Transactor, path []string) (bool, error) + Remove(ctx context.Context, t fdb.Transactor, path []string) (bool, error) // Exists returns true if the directory at path (relative to this Directory) // exists, and false otherwise. - Exists(rt fdb.ReadTransactor, path []string) (bool, error) + Exists(ctx context.Context, rt fdb.ReadTransactor, path []string) (bool, error) // List returns the names of the immediate subdirectories of the directory // at path (relative to this Directory) as a slice of strings. Each string // is the name of the last component of a subdirectory's path. - List(rt fdb.ReadTransactor, path []string) ([]string, error) + List(ctx context.Context, rt fdb.ReadTransactor, path []string) ([]string, error) // GetLayer returns the layer specified when this Directory was created. GetLayer() []byte @@ -165,14 +166,14 @@ func stringsEqual(a, b []string) bool { return true } -func moveTo(t fdb.Transactor, dl directoryLayer, path, newAbsolutePath []string) (DirectorySubspace, error) { +func moveTo(ctx context.Context, t fdb.Transactor, dl directoryLayer, path, newAbsolutePath []string) (DirectorySubspace, error) { partition_len := len(dl.path) if !stringsEqual(newAbsolutePath[:partition_len], dl.path) { return nil, errors.New("cannot move between partitions") } - return dl.Move(t, path[partition_len:], newAbsolutePath[partition_len:]) + return dl.Move(ctx, t, path[partition_len:], newAbsolutePath[partition_len:]) } var root = NewDirectoryLayer(subspace.FromBytes([]byte{0xFE}), subspace.AllKeys(), false) @@ -186,8 +187,8 @@ var root = NewDirectoryLayer(subspace.FromBytes([]byte{0xFE}), subspace.AllKeys( // as the layer; if layer is specified and the directory already exists, it is // compared against the layer specified when the directory was created, and an // error is returned if they differ. -func CreateOrOpen(t fdb.Transactor, path []string, layer []byte) (DirectorySubspace, error) { - return root.CreateOrOpen(t, path, layer) +func CreateOrOpen(ctx context.Context, t fdb.Transactor, path []string, layer []byte) (DirectorySubspace, error) { + return root.CreateOrOpen(ctx, t, path, layer) } // Open opens the directory specified by path (resolved relative to the default @@ -197,8 +198,8 @@ func CreateOrOpen(t fdb.Transactor, path []string, layer []byte) (DirectorySubsp // If the byte slice layer is specified, it is compared against the layer // specified when the directory was created, and an error is returned if they // differ. -func Open(rt fdb.ReadTransactor, path []string, layer []byte) (DirectorySubspace, error) { - return root.Open(rt, path, layer) +func Open(ctx context.Context, rt fdb.ReadTransactor, path []string, layer []byte) (DirectorySubspace, error) { + return root.Open(ctx, rt, path, layer) } // Create creates a directory specified by path (resolved relative to the @@ -207,8 +208,8 @@ func Open(rt fdb.ReadTransactor, path []string, layer []byte) (DirectorySubspace // // If the byte slice layer is specified, it is recorded as the layer and will be // checked when opening the directory in the future. -func Create(t fdb.Transactor, path []string, layer []byte) (DirectorySubspace, error) { - return root.Create(t, path, layer) +func Create(ctx context.Context, t fdb.Transactor, path []string, layer []byte) (DirectorySubspace, error) { + return root.Create(ctx, t, path, layer) } // Move moves the directory at oldPath to newPath (both resolved relative to the @@ -219,21 +220,21 @@ func Create(t fdb.Transactor, path []string, layer []byte) (DirectorySubspace, e // // There is no effect on the physical prefix of the given directory or on // clients that already have the directory open. -func Move(t fdb.Transactor, oldPath []string, newPath []string) (DirectorySubspace, error) { - return root.Move(t, oldPath, newPath) +func Move(ctx context.Context, t fdb.Transactor, oldPath []string, newPath []string) (DirectorySubspace, error) { + return root.Move(ctx, t, oldPath, newPath) } // Exists returns true if the directory at path (relative to the default root // directory) exists, and false otherwise. -func Exists(rt fdb.ReadTransactor, path []string) (bool, error) { - return root.Exists(rt, path) +func Exists(ctx context.Context, rt fdb.ReadTransactor, path []string) (bool, error) { + return root.Exists(ctx, rt, path) } // List returns the names of the immediate subdirectories of the default root // directory as a slice of strings. Each string is the name of the last // component of a subdirectory's path. -func List(rt fdb.ReadTransactor, path []string) ([]string, error) { - return root.List(rt, path) +func List(ctx context.Context, rt fdb.ReadTransactor, path []string) ([]string, error) { + return root.List(ctx, rt, path) } // Root returns the default root directory. Any attempt to move or remove the diff --git a/bindings/go/src/fdb/directory/directory_layer.go b/bindings/go/src/fdb/directory/directory_layer.go index b69d5ee6ca6..82041611e0c 100644 --- a/bindings/go/src/fdb/directory/directory_layer.go +++ b/bindings/go/src/fdb/directory/directory_layer.go @@ -24,6 +24,7 @@ package directory import ( "bytes" + "context" "encoding/binary" "errors" "fmt" @@ -71,8 +72,8 @@ func NewDirectoryLayer(nodeSS, contentSS subspace.Subspace, allowManualPrefixes return dl } -func (dl directoryLayer) createOrOpen(rtr fdb.ReadTransaction, tr *fdb.Transaction, path []string, layer []byte, prefix []byte, allowCreate, allowOpen bool) (DirectorySubspace, error) { - if err := dl.checkVersion(rtr, nil); err != nil { +func (dl directoryLayer) createOrOpen(ctx context.Context, rtr fdb.ReadTransaction, tr *fdb.Transaction, path []string, layer []byte, prefix []byte, allowCreate, allowOpen bool) (DirectorySubspace, error) { + if err := dl.checkVersion(ctx, rtr, nil); err != nil { return nil, err } @@ -87,15 +88,16 @@ func (dl directoryLayer) createOrOpen(rtr fdb.ReadTransaction, tr *fdb.Transacti return nil, errors.New("the root directory cannot be opened") } - existingNode := dl.find(rtr, path).prefetchMetadata(rtr) - if existingNode.exists() { - if existingNode.isInPartition(nil, false) { + existingNode, exists := dl.find(ctx, rtr, path).prefetchMetadata(rtr) + if exists { + defer existingNode._layer.Close() + if existingNode.isInPartition(ctx, nil, false) { subpath := existingNode.getPartitionSubpath() - enc, err := existingNode.getContents(dl, nil) + enc, err := existingNode.getContents(ctx, dl, nil) if err != nil { return nil, err } - return enc.(directoryPartition).createOrOpen(rtr, tr, subpath, layer, prefix, allowCreate, allowOpen) + return enc.(directoryPartition).createOrOpen(ctx, rtr, tr, subpath, layer, prefix, allowCreate, allowOpen) } if !allowOpen { @@ -103,35 +105,35 @@ func (dl directoryLayer) createOrOpen(rtr fdb.ReadTransaction, tr *fdb.Transacti } if layer != nil { - if l, err := existingNode._layer.Get(); err != nil || bytes.Compare(l, layer) != 0 { + if l, err := existingNode._layer.Get(ctx); err != nil || bytes.Compare(l, layer) != 0 { return nil, errors.New("the directory was created with an incompatible layer") } } - return existingNode.getContents(dl, nil) + return existingNode.getContents(ctx, dl, nil) } if !allowCreate { return nil, ErrDirNotExists } - if err := dl.checkVersion(rtr, tr); err != nil { + if err := dl.checkVersion(ctx, rtr, tr); err != nil { return nil, err } if prefix == nil { - newss, err := dl.allocator.allocate(*tr, dl.contentSS) + newss, err := dl.allocator.allocate(ctx, *tr, dl.contentSS) if err != nil { return nil, fmt.Errorf("unable to allocate new directory prefix (%s)", err.Error()) } - if !isRangeEmpty(rtr, newss) { + if !isRangeEmpty(ctx, rtr, newss) { return nil, fmt.Errorf("the database has keys stored at the prefix chosen by the automatic prefix allocator: %v", prefix) } prefix = newss.Bytes() - pf, err := dl.isPrefixFree(rtr.Snapshot(), prefix) + pf, err := dl.isPrefixFree(ctx, rtr.Snapshot(), prefix) if err != nil { return nil, err } @@ -139,7 +141,7 @@ func (dl directoryLayer) createOrOpen(rtr fdb.ReadTransaction, tr *fdb.Transacti return nil, errors.New("the directory layer has manually allocated prefixes that conflict with the automatic prefix allocator") } } else { - pf, err := dl.isPrefixFree(rtr, prefix) + pf, err := dl.isPrefixFree(ctx, rtr, prefix) if err != nil { return nil, err } @@ -151,7 +153,7 @@ func (dl directoryLayer) createOrOpen(rtr fdb.ReadTransaction, tr *fdb.Transacti var parentNode subspace.Subspace if len(path) > 1 { - pd, err := dl.createOrOpen(rtr, tr, path[:len(path)-1], nil, nil, true, true) + pd, err := dl.createOrOpen(ctx, rtr, tr, path[:len(path)-1], nil, nil, true, true) if err != nil { return nil, err } @@ -176,66 +178,72 @@ func (dl directoryLayer) createOrOpen(rtr fdb.ReadTransaction, tr *fdb.Transacti return dl.contentsOfNode(node, path, layer) } -func (dl directoryLayer) CreateOrOpen(t fdb.Transactor, path []string, layer []byte) (DirectorySubspace, error) { - r, err := t.Transact(func(tr fdb.Transaction) (interface{}, error) { - return dl.createOrOpen(tr, &tr, path, layer, nil, true, true) +func (dl directoryLayer) CreateOrOpen(ctx context.Context, t fdb.Transactor, path []string, layer []byte) (DirectorySubspace, error) { + r, txClose, err := t.Transact(ctx, func(tr fdb.Transaction) (interface{}, error) { + return dl.createOrOpen(ctx, tr, &tr, path, layer, nil, true, true) }) if err != nil { return nil, err } + txClose() return r.(DirectorySubspace), nil } -func (dl directoryLayer) Create(t fdb.Transactor, path []string, layer []byte) (DirectorySubspace, error) { - r, err := t.Transact(func(tr fdb.Transaction) (interface{}, error) { - return dl.createOrOpen(tr, &tr, path, layer, nil, true, false) +func (dl directoryLayer) Create(ctx context.Context, t fdb.Transactor, path []string, layer []byte) (DirectorySubspace, error) { + r, txClose, err := t.Transact(ctx, func(tr fdb.Transaction) (interface{}, error) { + return dl.createOrOpen(ctx, tr, &tr, path, layer, nil, true, false) }) if err != nil { return nil, err } + txClose() return r.(DirectorySubspace), nil } -func (dl directoryLayer) CreatePrefix(t fdb.Transactor, path []string, layer []byte, prefix []byte) (DirectorySubspace, error) { +func (dl directoryLayer) CreatePrefix(ctx context.Context, t fdb.Transactor, path []string, layer []byte, prefix []byte) (DirectorySubspace, error) { if prefix == nil { prefix = []byte{} } - r, err := t.Transact(func(tr fdb.Transaction) (interface{}, error) { - return dl.createOrOpen(tr, &tr, path, layer, prefix, true, false) + r, txClose, err := t.Transact(ctx, func(tr fdb.Transaction) (interface{}, error) { + return dl.createOrOpen(ctx, tr, &tr, path, layer, prefix, true, false) }) if err != nil { return nil, err } + txClose() return r.(DirectorySubspace), nil } -func (dl directoryLayer) Open(rt fdb.ReadTransactor, path []string, layer []byte) (DirectorySubspace, error) { - r, err := rt.ReadTransact(func(rtr fdb.ReadTransaction) (interface{}, error) { - return dl.createOrOpen(rtr, nil, path, layer, nil, false, true) +func (dl directoryLayer) Open(ctx context.Context, rt fdb.ReadTransactor, path []string, layer []byte) (DirectorySubspace, error) { + r, txClose, err := rt.ReadTransact(ctx, func(rtr fdb.ReadTransaction) (interface{}, error) { + return dl.createOrOpen(ctx, rtr, nil, path, layer, nil, false, true) }) if err != nil { return nil, err } + txClose() return r.(DirectorySubspace), nil } -func (dl directoryLayer) Exists(rt fdb.ReadTransactor, path []string) (bool, error) { - r, err := rt.ReadTransact(func(rtr fdb.ReadTransaction) (interface{}, error) { - if err := dl.checkVersion(rtr, nil); err != nil { +func (dl directoryLayer) Exists(ctx context.Context, rt fdb.ReadTransactor, path []string) (bool, error) { + r, txClose, err := rt.ReadTransact(ctx, func(rtr fdb.ReadTransaction) (interface{}, error) { + if err := dl.checkVersion(ctx, rtr, nil); err != nil { return false, err } - node := dl.find(rtr, path).prefetchMetadata(rtr) - if !node.exists() { + node, exists := dl.find(ctx, rtr, path).prefetchMetadata(rtr) + if !exists { return false, nil + } else { + defer node._layer.Close() } - if node.isInPartition(nil, false) { - nc, err := node.getContents(dl, nil) + if node.isInPartition(ctx, nil, false) { + nc, err := node.getContents(ctx, dl, nil) if err != nil { return false, err } - return nc.Exists(rtr, node.getPartitionSubpath()) + return nc.Exists(ctx, rtr, node.getPartitionSubpath()) } return true, nil @@ -243,43 +251,47 @@ func (dl directoryLayer) Exists(rt fdb.ReadTransactor, path []string) (bool, err if err != nil { return false, err } + txClose() return r.(bool), nil } -func (dl directoryLayer) List(rt fdb.ReadTransactor, path []string) ([]string, error) { - r, err := rt.ReadTransact(func(rtr fdb.ReadTransaction) (interface{}, error) { - if err := dl.checkVersion(rtr, nil); err != nil { +func (dl directoryLayer) List(ctx context.Context, rt fdb.ReadTransactor, path []string) ([]string, error) { + r, txClose, err := rt.ReadTransact(ctx, func(rtr fdb.ReadTransaction) (interface{}, error) { + if err := dl.checkVersion(ctx, rtr, nil); err != nil { return nil, err } - node := dl.find(rtr, path).prefetchMetadata(rtr) - if !node.exists() { + node, exists := dl.find(ctx, rtr, path).prefetchMetadata(rtr) + if !exists { return nil, ErrDirNotExists + } else { + defer node._layer.Close() } - if node.isInPartition(nil, true) { - nc, err := node.getContents(dl, nil) + if node.isInPartition(ctx, nil, true) { + nc, err := node.getContents(ctx, dl, nil) if err != nil { return nil, err } - return nc.List(rtr, node.getPartitionSubpath()) + return nc.List(ctx, rtr, node.getPartitionSubpath()) } - return dl.subdirNames(rtr, node.subspace) + return dl.subdirNames(ctx, rtr, node.subspace) }) if err != nil { return nil, err } + txClose() return r.([]string), nil } -func (dl directoryLayer) MoveTo(t fdb.Transactor, newAbsolutePath []string) (DirectorySubspace, error) { +func (dl directoryLayer) MoveTo(ctx context.Context, t fdb.Transactor, newAbsolutePath []string) (DirectorySubspace, error) { return nil, errors.New("the root directory cannot be moved") } -func (dl directoryLayer) Move(t fdb.Transactor, oldPath []string, newPath []string) (DirectorySubspace, error) { - r, err := t.Transact(func(tr fdb.Transaction) (interface{}, error) { - if err := dl.checkVersion(tr, &tr); err != nil { +func (dl directoryLayer) Move(ctx context.Context, t fdb.Transactor, oldPath []string, newPath []string) (DirectorySubspace, error) { + r, txClose, err := t.Transact(ctx, func(tr fdb.Transaction) (interface{}, error) { + if err := dl.checkVersion(ctx, tr, &tr); err != nil { return nil, err } @@ -291,30 +303,36 @@ func (dl directoryLayer) Move(t fdb.Transactor, oldPath []string, newPath []stri return nil, errors.New("the destination directory cannot be a subdirectory of the source directory") } - oldNode := dl.find(tr, oldPath).prefetchMetadata(tr) - newNode := dl.find(tr, newPath).prefetchMetadata(tr) + oldNode, oldNodeExists := dl.find(ctx, tr, oldPath).prefetchMetadata(tr) + if oldNodeExists { + defer oldNode._layer.Close() + } + newNode, newNodeExists := dl.find(ctx, tr, newPath).prefetchMetadata(tr) + if newNodeExists { + defer newNode._layer.Close() + } if !oldNode.exists() { return nil, errors.New("the source directory does not exist") } - if oldNode.isInPartition(nil, false) || newNode.isInPartition(nil, false) { - if !oldNode.isInPartition(nil, false) || !newNode.isInPartition(nil, false) || !stringsEqual(oldNode.path, newNode.path) { + if oldNode.isInPartition(ctx, nil, false) || newNode.isInPartition(ctx, nil, false) { + if !oldNode.isInPartition(ctx, nil, false) || !newNode.isInPartition(ctx, nil, false) || !stringsEqual(oldNode.path, newNode.path) { return nil, errors.New("cannot move between partitions") } - nnc, err := newNode.getContents(dl, nil) + nnc, err := newNode.getContents(ctx, dl, nil) if err != nil { return nil, err } - return nnc.Move(tr, oldNode.getPartitionSubpath(), newNode.getPartitionSubpath()) + return nnc.Move(ctx, tr, oldNode.getPartitionSubpath(), newNode.getPartitionSubpath()) } if newNode.exists() { return nil, errors.New("the destination directory already exists. Remove it first") } - parentNode := dl.find(tr, newPath[:len(newPath)-1]) + parentNode := dl.find(ctx, tr, newPath[:len(newPath)-1]) if !parentNode.exists() { return nil, errors.New("the parent of the destination directory does not exist. Create it first") } @@ -325,9 +343,9 @@ func (dl directoryLayer) Move(t fdb.Transactor, oldPath []string, newPath []stri } tr.Set(parentNode.subspace.Sub(_SUBDIRS, newPath[len(newPath)-1]), p[0].([]byte)) - dl.removeFromParent(tr, oldPath) + dl.removeFromParent(ctx, tr, oldPath) - l, err := oldNode._layer.Get() + l, err := oldNode._layer.Get(ctx) if err != nil { return nil, err } @@ -336,12 +354,13 @@ func (dl directoryLayer) Move(t fdb.Transactor, oldPath []string, newPath []stri if err != nil { return nil, err } + txClose() return r.(DirectorySubspace), nil } -func (dl directoryLayer) Remove(t fdb.Transactor, path []string) (bool, error) { - r, err := t.Transact(func(tr fdb.Transaction) (interface{}, error) { - if err := dl.checkVersion(tr, &tr); err != nil { +func (dl directoryLayer) Remove(ctx context.Context, t fdb.Transactor, path []string) (bool, error) { + r, txClose, err := t.Transact(ctx, func(tr fdb.Transaction) (interface{}, error) { + if err := dl.checkVersion(ctx, tr, &tr); err != nil { return false, err } @@ -349,37 +368,39 @@ func (dl directoryLayer) Remove(t fdb.Transactor, path []string) (bool, error) { return false, errors.New("the root directory cannot be removed") } - node := dl.find(tr, path).prefetchMetadata(tr) - - if !node.exists() { + node, exists := dl.find(ctx, tr, path).prefetchMetadata(tr) + if !exists { return false, nil + } else { + defer node._layer.Close() } - if node.isInPartition(nil, false) { - nc, err := node.getContents(dl, nil) + if node.isInPartition(ctx, nil, false) { + nc, err := node.getContents(ctx, dl, nil) if err != nil { return false, err } - return nc.(directoryPartition).Remove(tr, node.getPartitionSubpath()) + return nc.(directoryPartition).Remove(ctx, tr, node.getPartitionSubpath()) } - if err := dl.removeRecursive(tr, node.subspace); err != nil { + if err := dl.removeRecursive(ctx, tr, node.subspace); err != nil { return false, err } - dl.removeFromParent(tr, path) + dl.removeFromParent(ctx, tr, path) return true, nil }) if err != nil { return false, err } + txClose() return r.(bool), nil } -func (dl directoryLayer) removeRecursive(tr fdb.Transaction, node subspace.Subspace) error { - nodes := dl.subdirNodes(tr, node) +func (dl directoryLayer) removeRecursive(ctx context.Context, tr fdb.Transaction, node subspace.Subspace) error { + nodes := dl.subdirNodes(ctx, tr, node) for i := range nodes { - if err := dl.removeRecursive(tr, nodes[i]); err != nil { + if err := dl.removeRecursive(ctx, tr, nodes[i]); err != nil { return err } } @@ -399,8 +420,8 @@ func (dl directoryLayer) removeRecursive(tr fdb.Transaction, node subspace.Subsp return nil } -func (dl directoryLayer) removeFromParent(tr fdb.Transaction, path []string) { - parent := dl.find(tr, path[:len(path)-1]) +func (dl directoryLayer) removeFromParent(ctx context.Context, tr fdb.Transaction, path []string) { + parent := dl.find(ctx, tr, path[:len(path)-1]) tr.Clear(parent.subspace.Sub(_SUBDIRS, path[len(path)-1])) } @@ -412,20 +433,14 @@ func (dl directoryLayer) GetPath() []string { return dl.path } -func (dl directoryLayer) subdirNames(rtr fdb.ReadTransaction, node subspace.Subspace) ([]string, error) { +func (dl directoryLayer) subdirNames(ctx context.Context, rtr fdb.ReadTransaction, node subspace.Subspace) ([]string, error) { sd := node.Sub(_SUBDIRS) rr := rtr.GetRange(sd, fdb.RangeOptions{}) - ri := rr.Iterator() var ret []string - - for ri.Advance() { - kv, err := ri.Get() - if err != nil { - return nil, err - } - + kvs := rr.MustGet(ctx) + for _, kv := range kvs { p, err := sd.Unpack(kv.Key) if err != nil { return nil, err @@ -437,24 +452,21 @@ func (dl directoryLayer) subdirNames(rtr fdb.ReadTransaction, node subspace.Subs return ret, nil } -func (dl directoryLayer) subdirNodes(tr fdb.Transaction, node subspace.Subspace) []subspace.Subspace { +func (dl directoryLayer) subdirNodes(ctx context.Context, tr fdb.Transaction, node subspace.Subspace) []subspace.Subspace { sd := node.Sub(_SUBDIRS) rr := tr.GetRange(sd, fdb.RangeOptions{}) - ri := rr.Iterator() var ret []subspace.Subspace - - for ri.Advance() { - kv := ri.MustGet() - + kvs := rr.MustGet(ctx) + for _, kv := range kvs { ret = append(ret, dl.nodeWithPrefix(kv.Value)) } return ret } -func (dl directoryLayer) nodeContainingKey(rtr fdb.ReadTransaction, key []byte) (subspace.Subspace, error) { +func (dl directoryLayer) nodeContainingKey(ctx context.Context, rtr fdb.ReadTransaction, key []byte) (subspace.Subspace, error) { if bytes.HasPrefix(key, dl.nodeSS.Bytes()) { return dl.rootNode, nil } @@ -462,7 +474,7 @@ func (dl directoryLayer) nodeContainingKey(rtr fdb.ReadTransaction, key []byte) bk, _ := dl.nodeSS.FDBRangeKeys() kr := fdb.KeyRange{bk, fdb.Key(append(dl.nodeSS.Pack(tuple.Tuple{key}), 0x00))} - kvs, err := rtr.GetRange(kr, fdb.RangeOptions{Reverse: true, Limit: 1}).GetSliceWithError() + kvs, err := rtr.GetRange(kr, fdb.RangeOptions{Reverse: true, Limit: 1}).Get(ctx) if err != nil { return nil, err } @@ -480,12 +492,12 @@ func (dl directoryLayer) nodeContainingKey(rtr fdb.ReadTransaction, key []byte) return nil, nil } -func (dl directoryLayer) isPrefixFree(rtr fdb.ReadTransaction, prefix []byte) (bool, error) { +func (dl directoryLayer) isPrefixFree(ctx context.Context, rtr fdb.ReadTransaction, prefix []byte) (bool, error) { if len(prefix) == 0 { return false, nil } - nck, err := dl.nodeContainingKey(rtr, prefix) + nck, err := dl.nodeContainingKey(ctx, rtr, prefix) if err != nil { return false, err } @@ -499,15 +511,18 @@ func (dl directoryLayer) isPrefixFree(rtr fdb.ReadTransaction, prefix []byte) (b } bk, ek := kr.FDBRangeKeys() - if !isRangeEmpty(rtr, fdb.KeyRange{dl.nodeSS.Pack(tuple.Tuple{bk}), dl.nodeSS.Pack(tuple.Tuple{ek})}) { + if !isRangeEmpty(ctx, rtr, fdb.KeyRange{dl.nodeSS.Pack(tuple.Tuple{bk}), dl.nodeSS.Pack(tuple.Tuple{ek})}) { return false, nil } return true, nil } -func (dl directoryLayer) checkVersion(rtr fdb.ReadTransaction, tr *fdb.Transaction) error { - version, err := rtr.Get(dl.rootNode.Sub([]byte("version"))).Get() +func (dl directoryLayer) checkVersion(ctx context.Context, rtr fdb.ReadTransaction, tr *fdb.Transaction) error { + f := rtr.Get(dl.rootNode.Sub([]byte("version"))) + defer f.Close() + + version, err := f.Get(ctx) if err != nil { return err } @@ -588,14 +603,33 @@ func (dl directoryLayer) nodeWithPrefix(prefix []byte) subspace.Subspace { return dl.nodeSS.Sub(prefix) } -func (dl directoryLayer) find(rtr fdb.ReadTransaction, path []string) *node { +// find will find the specified path using multiple futures. +// Caller is responsible for closing the layer information future of the returned node. +func (dl directoryLayer) find(ctx context.Context, rtr fdb.ReadTransaction, path []string) *node { n := &node{dl.rootNode, []string{}, path, nil} + futures := make([]fdb.FutureByteSlice, len(path)) for i := range path { - n = &node{dl.nodeWithPrefix(rtr.Get(n.subspace.Sub(_SUBDIRS, path[i])).MustGet()), path[:i+1], path, nil} - if !n.exists() || bytes.Compare(n.layer(rtr).MustGet(), []byte("partition")) == 0 { + futures[i] = rtr.Get(n.subspace.Sub(_SUBDIRS, path[i])) + } + defer func() { + for _, f := range futures { + f.Close() + } + }() + + n = nil + for i, f := range futures { + if n.exists() { + // release resources if this node is not going to be returned + n._layer.Close() + } + + n = &node{dl.nodeWithPrefix(f.MustGet(ctx)), path[:i+1], path, nil} + if !n.exists() || bytes.Compare(n.layer(rtr).MustGet(ctx), []byte("partition")) == 0 { return n } } + return n } @@ -606,8 +640,8 @@ func (dl directoryLayer) partitionSubpath(lpath, rpath []string) []string { return r } -func isRangeEmpty(rtr fdb.ReadTransaction, r fdb.Range) bool { - kvs := rtr.GetRange(r, fdb.RangeOptions{Limit: 1}).GetSliceOrPanic() +func isRangeEmpty(ctx context.Context, rtr fdb.ReadTransaction, r fdb.Range) bool { + kvs := rtr.GetRange(r, fdb.RangeOptions{Limit: 1}).MustGet(ctx) return len(kvs) == 0 } diff --git a/bindings/go/src/fdb/directory/directory_partition.go b/bindings/go/src/fdb/directory/directory_partition.go index 2e15de2dcfe..13450ac6cee 100644 --- a/bindings/go/src/fdb/directory/directory_partition.go +++ b/bindings/go/src/fdb/directory/directory_partition.go @@ -23,6 +23,8 @@ package directory import ( + "context" + "github.com/apple/foundationdb/bindings/go/src/fdb" "github.com/apple/foundationdb/bindings/go/src/fdb/subspace" "github.com/apple/foundationdb/bindings/go/src/fdb/tuple" @@ -80,16 +82,16 @@ func (dp directoryPartition) getLayerForPath(path []string) directoryLayer { return dp.directoryLayer } -func (dp directoryPartition) MoveTo(t fdb.Transactor, newAbsolutePath []string) (DirectorySubspace, error) { - return moveTo(t, dp.parentDirectoryLayer, dp.path, newAbsolutePath) +func (dp directoryPartition) MoveTo(ctx context.Context, t fdb.Transactor, newAbsolutePath []string) (DirectorySubspace, error) { + return moveTo(ctx, t, dp.parentDirectoryLayer, dp.path, newAbsolutePath) } -func (dp directoryPartition) Remove(t fdb.Transactor, path []string) (bool, error) { +func (dp directoryPartition) Remove(ctx context.Context, t fdb.Transactor, path []string) (bool, error) { dl := dp.getLayerForPath(path) - return dl.Remove(t, dl.partitionSubpath(dp.path, path)) + return dl.Remove(ctx, t, dl.partitionSubpath(dp.path, path)) } -func (dp directoryPartition) Exists(rt fdb.ReadTransactor, path []string) (bool, error) { +func (dp directoryPartition) Exists(ctx context.Context, rt fdb.ReadTransactor, path []string) (bool, error) { dl := dp.getLayerForPath(path) - return dl.Exists(rt, dl.partitionSubpath(dp.path, path)) + return dl.Exists(ctx, rt, dl.partitionSubpath(dp.path, path)) } diff --git a/bindings/go/src/fdb/directory/directory_subspace.go b/bindings/go/src/fdb/directory/directory_subspace.go index d9948e203d3..7d94a6728c8 100644 --- a/bindings/go/src/fdb/directory/directory_subspace.go +++ b/bindings/go/src/fdb/directory/directory_subspace.go @@ -23,6 +23,7 @@ package directory import ( + "context" "fmt" "strings" @@ -58,40 +59,40 @@ func (ds directorySubspace) String() string { return fmt.Sprintf("DirectorySubspace(%s, %s)", path, fdb.Printable(ds.Bytes())) } -func (d directorySubspace) CreateOrOpen(t fdb.Transactor, path []string, layer []byte) (DirectorySubspace, error) { - return d.dl.CreateOrOpen(t, d.dl.partitionSubpath(d.path, path), layer) +func (d directorySubspace) CreateOrOpen(ctx context.Context, t fdb.Transactor, path []string, layer []byte) (DirectorySubspace, error) { + return d.dl.CreateOrOpen(ctx, t, d.dl.partitionSubpath(d.path, path), layer) } -func (d directorySubspace) Create(t fdb.Transactor, path []string, layer []byte) (DirectorySubspace, error) { - return d.dl.Create(t, d.dl.partitionSubpath(d.path, path), layer) +func (d directorySubspace) Create(ctx context.Context, t fdb.Transactor, path []string, layer []byte) (DirectorySubspace, error) { + return d.dl.Create(ctx, t, d.dl.partitionSubpath(d.path, path), layer) } -func (d directorySubspace) CreatePrefix(t fdb.Transactor, path []string, layer []byte, prefix []byte) (DirectorySubspace, error) { - return d.dl.CreatePrefix(t, d.dl.partitionSubpath(d.path, path), layer, prefix) +func (d directorySubspace) CreatePrefix(ctx context.Context, t fdb.Transactor, path []string, layer []byte, prefix []byte) (DirectorySubspace, error) { + return d.dl.CreatePrefix(ctx, t, d.dl.partitionSubpath(d.path, path), layer, prefix) } -func (d directorySubspace) Open(rt fdb.ReadTransactor, path []string, layer []byte) (DirectorySubspace, error) { - return d.dl.Open(rt, d.dl.partitionSubpath(d.path, path), layer) +func (d directorySubspace) Open(ctx context.Context, rt fdb.ReadTransactor, path []string, layer []byte) (DirectorySubspace, error) { + return d.dl.Open(ctx, rt, d.dl.partitionSubpath(d.path, path), layer) } -func (d directorySubspace) MoveTo(t fdb.Transactor, newAbsolutePath []string) (DirectorySubspace, error) { - return moveTo(t, d.dl, d.path, newAbsolutePath) +func (d directorySubspace) MoveTo(ctx context.Context, t fdb.Transactor, newAbsolutePath []string) (DirectorySubspace, error) { + return moveTo(ctx, t, d.dl, d.path, newAbsolutePath) } -func (d directorySubspace) Move(t fdb.Transactor, oldPath []string, newPath []string) (DirectorySubspace, error) { - return d.dl.Move(t, d.dl.partitionSubpath(d.path, oldPath), d.dl.partitionSubpath(d.path, newPath)) +func (d directorySubspace) Move(ctx context.Context, t fdb.Transactor, oldPath []string, newPath []string) (DirectorySubspace, error) { + return d.dl.Move(ctx, t, d.dl.partitionSubpath(d.path, oldPath), d.dl.partitionSubpath(d.path, newPath)) } -func (d directorySubspace) Remove(t fdb.Transactor, path []string) (bool, error) { - return d.dl.Remove(t, d.dl.partitionSubpath(d.path, path)) +func (d directorySubspace) Remove(ctx context.Context, t fdb.Transactor, path []string) (bool, error) { + return d.dl.Remove(ctx, t, d.dl.partitionSubpath(d.path, path)) } -func (d directorySubspace) Exists(rt fdb.ReadTransactor, path []string) (bool, error) { - return d.dl.Exists(rt, d.dl.partitionSubpath(d.path, path)) +func (d directorySubspace) Exists(ctx context.Context, rt fdb.ReadTransactor, path []string) (bool, error) { + return d.dl.Exists(ctx, rt, d.dl.partitionSubpath(d.path, path)) } -func (d directorySubspace) List(rt fdb.ReadTransactor, path []string) ([]string, error) { - return d.dl.List(rt, d.dl.partitionSubpath(d.path, path)) +func (d directorySubspace) List(ctx context.Context, rt fdb.ReadTransactor, path []string) ([]string, error) { + return d.dl.List(ctx, rt, d.dl.partitionSubpath(d.path, path)) } func (d directorySubspace) GetLayer() []byte { diff --git a/bindings/go/src/fdb/directory/node.go b/bindings/go/src/fdb/directory/node.go index 57c3705ddb0..ea67e816387 100644 --- a/bindings/go/src/fdb/directory/node.go +++ b/bindings/go/src/fdb/directory/node.go @@ -24,6 +24,7 @@ package directory import ( "bytes" + "context" "github.com/apple/foundationdb/bindings/go/src/fdb" "github.com/apple/foundationdb/bindings/go/src/fdb/subspace" @@ -43,13 +44,19 @@ func (n *node) exists() bool { return true } -func (n *node) prefetchMetadata(rtr fdb.ReadTransaction) *node { +// prefetchMetadata will make sure that layer information starts being prefetched. +// If subspace is nil, this is a no-op and returns false. +// Caller is responsible for closing the future used to fetch layer information. +func (n *node) prefetchMetadata(rtr fdb.ReadTransaction) (*node, bool) { if n.exists() { n.layer(rtr) + return n, true } - return n + return n, false } +// layer will start a future to fetch layer information, unless one already exists. +// Caller is responsible for properly closing this future. func (n *node) layer(rtr fdb.ReadTransaction) fdb.FutureByteSlice { if n._layer == nil { fv := rtr.Get(n.subspace.Sub([]byte("layer"))) @@ -59,16 +66,16 @@ func (n *node) layer(rtr fdb.ReadTransaction) fdb.FutureByteSlice { return n._layer } -func (n *node) isInPartition(tr *fdb.Transaction, includeEmptySubpath bool) bool { - return n.exists() && bytes.Compare(n._layer.MustGet(), []byte("partition")) == 0 && (includeEmptySubpath || len(n.targetPath) > len(n.path)) +func (n *node) isInPartition(ctx context.Context, tr *fdb.Transaction, includeEmptySubpath bool) bool { + return n.exists() && bytes.Compare(n._layer.MustGet(ctx), []byte("partition")) == 0 && (includeEmptySubpath || len(n.targetPath) > len(n.path)) } func (n *node) getPartitionSubpath() []string { return n.targetPath[len(n.path):] } -func (n *node) getContents(dl directoryLayer, tr *fdb.Transaction) (DirectorySubspace, error) { - l, err := n._layer.Get() +func (n *node) getContents(ctx context.Context, dl directoryLayer, tr *fdb.Transaction) (DirectorySubspace, error) { + l, err := n._layer.Get(ctx) if err != nil { return nil, err } diff --git a/bindings/go/src/fdb/errors_test.go b/bindings/go/src/fdb/errors_test.go index 1f1474decbc..4d52136126d 100644 --- a/bindings/go/src/fdb/errors_test.go +++ b/bindings/go/src/fdb/errors_test.go @@ -23,6 +23,7 @@ package fdb import ( + "context" "errors" "fmt" "testing" @@ -52,11 +53,16 @@ func TestErrorWrapping(t *testing.T) { } for _, inputError := range testCases { - _, outputError := db.ReadTransact(func(rtr ReadTransaction) (interface{}, error) { + _, txClose, outputError := db.ReadTransact(context.Background(), func(rtr ReadTransaction) (interface{}, error) { return nil, inputError }) if inputError != outputError { t.Errorf("expected error %v to be the same as %v", outputError, inputError) } + if outputError == nil && txClose == nil { + t.Error("close function should not be nil on success") + } else if outputError != nil && txClose != nil { + t.Error("close function should be nil in case of error") + } } } diff --git a/bindings/go/src/fdb/fdb.go b/bindings/go/src/fdb/fdb.go index d4c62168609..211729189a6 100644 --- a/bindings/go/src/fdb/fdb.go +++ b/bindings/go/src/fdb/fdb.go @@ -29,6 +29,7 @@ import "C" import ( "bytes" + "context" "errors" "fmt" "sync" @@ -51,10 +52,10 @@ var ( // exports and functions in preamble // (https://code.google.com/p/go-wiki/wiki/cgo#Global_functions) // -//export unlockMutex -func unlockMutex(p unsafe.Pointer) { - m := (*sync.Mutex)(p) - m.Unlock() +//export goFutureReadyCallback +func goFutureReadyCallback(pF, pRs unsafe.Pointer) { + rs := (*readySignal)(pRs) + close(*rs) } // A Transactor can execute a function that requires a Transaction. Functions @@ -64,7 +65,8 @@ type Transactor interface { // Transact executes the caller-provided function, providing it with a // Transaction (itself a Transactor, allowing composition of transactional // functions). - Transact(func(Transaction) (interface{}, error)) (interface{}, error) + // The returned function must be called after all futures have been closed, and is nil in case of error. + Transact(context.Context, func(Transaction) (interface{}, error)) (interface{}, func(), error) // All Transactors are also ReadTransactors, allowing them to be used with // read-only transactional functions. @@ -79,7 +81,8 @@ type ReadTransactor interface { // ReadTransact executes the caller-provided function, providing it with a // ReadTransaction (itself a ReadTransactor, allowing composition of // read-only transactional functions). - ReadTransact(func(ReadTransaction) (interface{}, error)) (interface{}, error) + // The returned function must be called after all futures have been closed, and is nil in case of error. + ReadTransact(context.Context, func(ReadTransaction) (interface{}, error)) (interface{}, func(), error) } func setOpt(setter func(*C.uint8_t, C.int) C.fdb_error_t, param []byte) error { diff --git a/bindings/go/src/fdb/fdb_test.go b/bindings/go/src/fdb/fdb_test.go index 8a6753224c0..49bcbe78fa4 100644 --- a/bindings/go/src/fdb/fdb_test.go +++ b/bindings/go/src/fdb/fdb_test.go @@ -23,6 +23,7 @@ package fdb_test import ( + "context" "fmt" "os" "testing" @@ -62,23 +63,29 @@ func TestVersionstamp(t *testing.T) { fdb.MustAPIVersion(API_VERSION) db := fdb.MustOpenDefault() - setVs := func(t fdb.Transactor, key fdb.Key) (fdb.FutureKey, error) { - fmt.Printf("setOne called with: %T\n", t) - ret, err := t.Transact(func(tr fdb.Transaction) (interface{}, error) { + setVs := func(tr fdb.Transactor, key fdb.Key) (fdb.FutureKey, error) { + fmt.Printf("setOne called with: %T\n", tr) + ret, txClose, err := tr.Transact(context.Background(), func(tr fdb.Transaction) (interface{}, error) { tr.SetVersionstampedValue(key, []byte("blahblahbl\x00\x00\x00\x00")) return tr.GetVersionstamp(), nil }) + if err == nil { + t.Cleanup(txClose) + } return ret.(fdb.FutureKey), err } getOne := func(rt fdb.ReadTransactor, key fdb.Key) ([]byte, error) { fmt.Printf("getOne called with: %T\n", rt) - ret, err := rt.ReadTransact(func(rtr fdb.ReadTransaction) (interface{}, error) { - return rtr.Get(key).MustGet(), nil + ret, txClose, err := rt.ReadTransact(context.Background(), func(rtr fdb.ReadTransaction) (interface{}, error) { + f := rtr.Get(key) + defer f.Close() + return f.MustGet(), nil }) if err != nil { return nil, err } + txClose() return ret.([]byte), nil } @@ -109,14 +116,15 @@ func TestEstimatedRangeSize(t *testing.T) { db := fdb.MustOpenDefault() var f fdb.FutureInt64 - _, err := db.ReadTransact(func(rtr fdb.ReadTransaction) (interface{}, error) { + _, txClose, err := db.ReadTransact(context.Background(), func(rtr fdb.ReadTransaction) (interface{}, error) { f = rtr.GetEstimatedRangeSizeBytes(subspace.AllKeys()) return nil, nil }) if err != nil { - t.Error(err) + t.Fatal(err) } + t.Cleanup(txClose) _, err = f.Get() if err != nil { @@ -127,37 +135,48 @@ func TestEstimatedRangeSize(t *testing.T) { func TestReadTransactionOptions(t *testing.T) { fdb.MustAPIVersion(API_VERSION) db := fdb.MustOpenDefault() - _, err := db.ReadTransact(func(rtr fdb.ReadTransaction) (interface{}, error) { + _, txClose, err := db.ReadTransact(context.Background(), func(rtr fdb.ReadTransaction) (interface{}, error) { rtr.Options().SetAccessSystemKeys() - return rtr.Get(fdb.Key("\xff/")).MustGet(), nil + f := rtr.Get(fdb.Key("\xff/")) + defer f.Close() + return f.MustGet(), nil }) if err != nil { - t.Errorf("Failed to read system key: %s", err) + t.Fatalf("Failed to read system key: %s", err) } + txClose() } func ExampleTransactor() { fdb.MustAPIVersion(API_VERSION) db := fdb.MustOpenDefault() - setOne := func(t fdb.Transactor, key fdb.Key, value []byte) error { - fmt.Printf("setOne called with: %T\n", t) - _, err := t.Transact(func(tr fdb.Transaction) (interface{}, error) { + setOne := func(tr fdb.Transactor, key fdb.Key, value []byte) error { + fmt.Printf("setOne called with: %T\n", tr) + _, txClose, err := tr.Transact(context.Background(), func(tr fdb.Transaction) (interface{}, error) { // We don't actually call tr.Set here to avoid mutating a real database. // tr.Set(key, value) return nil, nil }) + if err == nil { + // close transaction since no futures were created in it + txClose() + } return err } - setMany := func(t fdb.Transactor, value []byte, keys ...fdb.Key) error { - fmt.Printf("setMany called with: %T\n", t) - _, err := t.Transact(func(tr fdb.Transaction) (interface{}, error) { + setMany := func(tr fdb.Transactor, value []byte, keys ...fdb.Key) error { + fmt.Printf("setMany called with: %T\n", tr) + _, txClose, err := tr.Transact(context.Background(), func(tr fdb.Transaction) (interface{}, error) { for _, key := range keys { setOne(tr, key, value) } return nil, nil }) + if err == nil { + // close transaction since no futures were created in it + txClose() + } return err } @@ -193,25 +212,35 @@ func ExampleReadTransactor() { getOne := func(rt fdb.ReadTransactor, key fdb.Key) ([]byte, error) { fmt.Printf("getOne called with: %T\n", rt) - ret, err := rt.ReadTransact(func(rtr fdb.ReadTransaction) (interface{}, error) { - return rtr.Get(key).MustGet(), nil + ret, txClose, err := rt.ReadTransact(context.Background(), func(rtr fdb.ReadTransaction) (interface{}, error) { + f := rtr.Get(key) + defer f.Close() + return f.MustGet(), nil }) if err != nil { return nil, err } + txClose() return ret.([]byte), nil } getTwo := func(rt fdb.ReadTransactor, key1, key2 fdb.Key) ([][]byte, error) { fmt.Printf("getTwo called with: %T\n", rt) - ret, err := rt.ReadTransact(func(rtr fdb.ReadTransaction) (interface{}, error) { - r1, _ := getOne(rtr, key1) - r2, _ := getOne(rtr.Snapshot(), key2) + ret, txClose, err := rt.ReadTransact(context.Background(), func(rtr fdb.ReadTransaction) (interface{}, error) { + r1, err := getOne(rtr, key1) + if err != nil { + return nil, err + } + r2, err := getOne(rtr.Snapshot(), key2) + if err != nil { + return nil, err + } return [][]byte{r1, r2}, nil }) if err != nil { return nil, err } + txClose() return ret.([][]byte), nil } @@ -265,7 +294,7 @@ func ExamplePrefixRange() { pr, _ := fdb.PrefixRange([]byte("alphabet")) // Read and process the range - kvs, err := tr.GetRange(pr, fdb.RangeOptions{}).GetSliceWithError() + kvs, err := tr.GetRange(pr, fdb.RangeOptions{}).Get() if err != nil { fmt.Printf("Unable to read range: %v\n", err) } @@ -298,15 +327,21 @@ func ExampleRangeIterator() { rr := tr.GetRange(fdb.KeyRange{fdb.Key(""), fdb.Key{0xFF}}, fdb.RangeOptions{}) ri := rr.Iterator() + defer ri.Close() - // Advance will return true until the iterator is exhausted - for ri.Advance() { - kv, err := ri.Get() + // NextBatch will return values until the iterator is exhausted + for { + kvs, err := ri.NextBatch(true) if err != nil { fmt.Printf("Unable to read next value: %v\n", err) return } - fmt.Printf("%s is %s\n", kv.Key, kv.Value) + for _, kv := range kvs { + fmt.Printf("%s is %s\n", kv.Key, kv.Value) + } + if len(kvs) == 0 { + break + } } // Output: diff --git a/bindings/go/src/fdb/futures.go b/bindings/go/src/fdb/futures.go index d47d0a12239..550ff141f76 100644 --- a/bindings/go/src/fdb/futures.go +++ b/bindings/go/src/fdb/futures.go @@ -27,19 +27,19 @@ package fdb // #include // #include // -// extern void unlockMutex(void*); +// extern void goFutureReadyCallback(void*, void*); // -// void go_callback(FDBFuture* f, void* m) { -// unlockMutex(m); +// void c_future_ready_callback(FDBFuture* f, void* rs) { +// goFutureReadyCallback(f, rs); // } // -// void go_set_callback(void* f, void* m) { -// fdb_future_set_callback(f, (FDBCallback)&go_callback, m); +// void c_set_callback(void* f, void* rs) { +// fdb_future_set_callback(f, (FDBCallback)&c_future_ready_callback, rs); // } import "C" import ( - "runtime" + "context" "sync" "unsafe" ) @@ -47,85 +47,76 @@ import ( // A Future represents a value (or error) to be available at some later // time. Asynchronous FDB API functions return one of the types that implement // the Future interface. All Future types additionally implement Get and MustGet -// methods with different return types. Calling BlockUntilReady, Get or MustGet -// will block the calling goroutine until the Future is ready. +// methods with different return types. Calling Get or MustGet will block +// the calling goroutine until the Future is ready. type Future interface { - // BlockUntilReady blocks the calling goroutine until the future is ready. A - // future becomes ready either when it receives a value of its enclosed type - // (if any) or is set to an error state. - BlockUntilReady() - // IsReady returns true if the future is ready, and false otherwise, without // blocking. A future is ready either when has received a value of its // enclosed type (if any) or has been set to an error state. IsReady() bool - // Cancel cancels a future and its associated asynchronous operation. If - // called before the future becomes ready, attempts to access the future - // will return an error. Cancel has no effect if the future is already - // ready. - // - // Note that even if a future is not ready, the associated asynchronous - // operation may already have completed and be unable to be cancelled. - Cancel() + // Close will release resources associated with this future. + // It must always be called at least once. + Close() } type future struct { - // db is used to hint Go's GC about the dependency on the parent database object. - // This prevents the database to be garbage-collected before future is out of scope. - db *database - // t is used to hint Go's GC about the dependency on the parent transaction object. - // This prevents the transaction to be garbage-collected before future is out of scope. - t *transaction - ptr *C.FDBFuture -} - -func newFuture(t *transaction, ptr *C.FDBFuture) *future { - return newFutureWithDb(nil, t, ptr) + ptr *C.FDBFuture + closer sync.Once } -func newFutureWithDb(db *database, t *transaction, ptr *C.FDBFuture) *future { +// newFuture returns a future which must be explicitly destroyed with a call to destroy(). +func newFuture(ptr *C.FDBFuture) *future { f := &future{ - db: db, - t: t, ptr: ptr, } - runtime.SetFinalizer(f, func(f *future) { C.fdb_future_destroy(f.ptr) }) return f } +type readySignal chan (struct{}) + +// blockUntilReady blocks the calling goroutine until the given Future is ready. +// This is a Go re-implementation of fdb_future_block_until_ready but it differs because +// it will return the Future's error if any was set. // Note: This function guarantees the callback will be executed **at most once**. -func fdb_future_block_until_ready(f *C.FDBFuture) { - if C.fdb_future_is_ready(f) != 0 { - return +func (f *future) blockUntilReady(ctx context.Context) error { + if C.fdb_future_is_ready(f.ptr) == 0 { + // The channel here is used as a signal that the callback is complete. + // The callback is responsible for closing it, and this function returns + // only after that has happened. + // + // See also: https://groups.google.com/forum/#!topic/golang-nuts/SPjQEcsdORA + rs := make(readySignal) + C.c_set_callback(unsafe.Pointer(f.ptr), unsafe.Pointer(&rs)) + + select { + case <-rs: + // future is ready (either with value or error) + break + case <-ctx.Done(): + // Note: future cancellation does not happen here but rather on the calling Get() + // when the future is closed + + return ctx.Err() + } } - // The mutex here is used as a signal that the callback is complete. - // We first lock it, then pass it to the callback, and then lock it - // again. The second call to lock won't return until the callback has - // fired. - // - // See https://groups.google.com/forum/#!topic/golang-nuts/SPjQEcsdORA - // for the history of why this pattern came to be used. - m := &sync.Mutex{} - m.Lock() - C.go_set_callback(unsafe.Pointer(f), unsafe.Pointer(m)) - m.Lock() -} + if err := C.fdb_future_get_error(f.ptr); err != 0 { + return Error{int(err)} + } -func (f *future) BlockUntilReady() { - defer runtime.KeepAlive(f) - fdb_future_block_until_ready(f.ptr) + return nil } func (f *future) IsReady() bool { - defer runtime.KeepAlive(f) return C.fdb_future_is_ready(f.ptr) != 0 } -func (f *future) Cancel() { - defer runtime.KeepAlive(f) - C.fdb_future_cancel(f.ptr) +// Close must be explicitly called for each future to avoid a memory leak. +func (f *future) Close() { + f.closer.Do(func() { + C.fdb_future_destroy(f.ptr) + }) } // FutureByteSlice represents the asynchronous result of a function that returns @@ -136,34 +127,37 @@ type FutureByteSlice interface { // if the asynchronous operation associated with this future did not // successfully complete. The current goroutine will be blocked until the // future is ready. - Get() ([]byte, error) + Get(context.Context) ([]byte, error) // MustGet returns a database value (or nil if there is no value), or panics // if the asynchronous operation associated with this future did not // successfully complete. The current goroutine will be blocked until the // future is ready. - MustGet() []byte + MustGet(context.Context) []byte Future } type futureByteSlice struct { *future - v []byte - e error - o sync.Once + v []byte + e error + getter sync.Once } -func (f *futureByteSlice) Get() ([]byte, error) { - f.o.Do(func() { - defer runtime.KeepAlive(f.future) +func (f *futureByteSlice) Get(ctx context.Context) ([]byte, error) { + f.getter.Do(func() { + defer f.Close() + + err := f.blockUntilReady(ctx) + if err != nil { + f.e = err + return + } var present C.fdb_bool_t var value *C.uint8_t var length C.int - - f.BlockUntilReady() - if err := C.fdb_future_get_value(f.ptr, &present, &value, &length); err != 0 { f.e = Error{int(err)} return @@ -172,15 +166,13 @@ func (f *futureByteSlice) Get() ([]byte, error) { if present != 0 { f.v = C.GoBytes(unsafe.Pointer(value), length) } - - C.fdb_future_release_memory(f.ptr) }) return f.v, f.e } -func (f *futureByteSlice) MustGet() []byte { - val, err := f.Get() +func (f *futureByteSlice) MustGet(ctx context.Context) []byte { + val, err := f.Get(ctx) if err != nil { panic(err) } @@ -194,46 +186,48 @@ type FutureKey interface { // Get returns a database key or an error if the asynchronous operation // associated with this future did not successfully complete. The current // goroutine will be blocked until the future is ready. - Get() (Key, error) + Get(context.Context) (Key, error) // MustGet returns a database key, or panics if the asynchronous operation // associated with this future did not successfully complete. The current // goroutine will be blocked until the future is ready. - MustGet() Key + MustGet(context.Context) Key Future } type futureKey struct { *future - k Key - e error - o sync.Once + k Key + e error + get sync.Once } -func (f *futureKey) Get() (Key, error) { - f.o.Do(func() { - defer runtime.KeepAlive(f.future) +func (f *futureKey) Get(ctx context.Context) (Key, error) { + f.get.Do(func() { + defer f.Close() + + err := f.blockUntilReady(ctx) + if err != nil { + f.e = err + return + } var value *C.uint8_t var length C.int - - f.BlockUntilReady() - if err := C.fdb_future_get_key(f.ptr, &value, &length); err != 0 { f.e = Error{int(err)} return } f.k = C.GoBytes(unsafe.Pointer(value), length) - C.fdb_future_release_memory(f.ptr) }) return f.k, f.e } -func (f *futureKey) MustGet() Key { - val, err := f.Get() +func (f *futureKey) MustGet(ctx context.Context) Key { + val, err := f.Get(ctx) if err != nil { panic(err) } @@ -247,39 +241,48 @@ type FutureNil interface { // Get returns an error if the asynchronous operation associated with this // future did not successfully complete. The current goroutine will be // blocked until the future is ready. - Get() error + Get(context.Context) error // MustGet panics if the asynchronous operation associated with this future // did not successfully complete. The current goroutine will be blocked // until the future is ready. - MustGet() + MustGet(context.Context) Future } type futureNil struct { *future + e error + get sync.Once } -func (f *futureNil) Get() error { - defer runtime.KeepAlive(f.future) +func (f *futureNil) Get(ctx context.Context) error { + f.get.Do(func() { + defer f.Close() - f.BlockUntilReady() - if err := C.fdb_future_get_error(f.ptr); err != 0 { - return Error{int(err)} - } + err := f.blockUntilReady(ctx) + if err != nil { + f.e = err + return + } + }) return nil } -func (f *futureNil) MustGet() { - if err := f.Get(); err != nil { +func (f *futureNil) MustGet(ctx context.Context) { + if err := f.Get(ctx); err != nil { panic(err) } } type futureKeyValueArray struct { *future + v []KeyValue + more bool + e error + o sync.Once } //go:nocheckptr @@ -295,29 +298,38 @@ func stringRefToSlice(ptr unsafe.Pointer) []byte { return C.GoBytes(src, size) } -func (f *futureKeyValueArray) Get() ([]KeyValue, bool, error) { - defer runtime.KeepAlive(f.future) - - f.BlockUntilReady() +func (f *futureKeyValueArray) Get(ctx context.Context) ([]KeyValue, bool, error) { + f.o.Do(func() { + err := f.blockUntilReady(ctx) + if err != nil { + f.e = err + return + } - var kvs *C.FDBKeyValue - var count C.int - var more C.fdb_bool_t + var kvs *C.FDBKeyValue + var count C.int + var more C.fdb_bool_t + if err := C.fdb_future_get_keyvalue_array(f.ptr, &kvs, &count, &more); err != 0 { + f.e = Error{int(err)} + return + } - if err := C.fdb_future_get_keyvalue_array(f.ptr, &kvs, &count, &more); err != 0 { - return nil, false, Error{int(err)} - } + f.v = make([]KeyValue, int(count)) + if more != 0 { + f.more = true + } - ret := make([]KeyValue, int(count)) + for i := 0; i < int(count); i++ { + kvptr := unsafe.Pointer(uintptr(unsafe.Pointer(kvs)) + uintptr(i*24)) - for i := 0; i < int(count); i++ { - kvptr := unsafe.Pointer(uintptr(unsafe.Pointer(kvs)) + uintptr(i*24)) + f.v[i].Key = stringRefToSlice(kvptr) + f.v[i].Value = stringRefToSlice(unsafe.Pointer(uintptr(kvptr) + 12)) + } - ret[i].Key = stringRefToSlice(kvptr) - ret[i].Value = stringRefToSlice(unsafe.Pointer(uintptr(kvptr) + 12)) - } + C.fdb_future_release_memory(f.ptr) + }) - return ret, (more != 0), nil + return f.v, f.more, f.e } // FutureKeyArray represents the asynchronous result of a function @@ -328,43 +340,52 @@ type FutureKeyArray interface { // Get returns an array of keys or an error if the asynchronous operation // associated with this future did not successfully complete. The current // goroutine will be blocked until the future is ready. - Get() ([]Key, error) + Get(context.Context) ([]Key, error) // MustGet returns an array of keys, or panics if the asynchronous operations // associated with this future did not successfully complete. The current goroutine // will be blocked until the future is ready. - MustGet() []Key + MustGet(context.Context) []Key } type futureKeyArray struct { *future + v []Key + e error + get sync.Once } -func (f *futureKeyArray) Get() ([]Key, error) { - defer runtime.KeepAlive(f.future) - - f.BlockUntilReady() +func (f *futureKeyArray) Get(ctx context.Context) ([]Key, error) { + f.get.Do(func() { + defer f.Close() - var ks *C.FDBKey - var count C.int + err := f.blockUntilReady(ctx) + if err != nil { + f.e = err + return + } - if err := C.fdb_future_get_key_array(f.ptr, &ks, &count); err != 0 { - return nil, Error{int(err)} - } + var ks *C.FDBKey + var count C.int + if err := C.fdb_future_get_key_array(f.ptr, &ks, &count); err != 0 { + f.e = Error{int(err)} + return + } - ret := make([]Key, int(count)) + f.v = make([]Key, int(count)) - for i := 0; i < int(count); i++ { - kptr := unsafe.Pointer(uintptr(unsafe.Pointer(ks)) + uintptr(i*12)) + for i := 0; i < int(count); i++ { + kptr := unsafe.Pointer(uintptr(unsafe.Pointer(ks)) + uintptr(i*12)) - ret[i] = stringRefToSlice(kptr) - } + f.v[i] = stringRefToSlice(kptr) + } + }) - return ret, nil + return f.v, f.e } -func (f *futureKeyArray) MustGet() []Key { - val, err := f.Get() +func (f *futureKeyArray) MustGet(ctx context.Context) []Key { + val, err := f.Get(ctx) if err != nil { panic(err) } @@ -378,35 +399,47 @@ type FutureInt64 interface { // Get returns a database version or an error if the asynchronous operation // associated with this future did not successfully complete. The current // goroutine will be blocked until the future is ready. - Get() (int64, error) + Get(context.Context) (int64, error) // MustGet returns a database version, or panics if the asynchronous // operation associated with this future did not successfully complete. The // current goroutine will be blocked until the future is ready. - MustGet() int64 + MustGet(context.Context) int64 Future } type futureInt64 struct { *future + v int64 + e error + get sync.Once } -func (f *futureInt64) Get() (int64, error) { - defer runtime.KeepAlive(f.future) +func (f *futureInt64) Get(ctx context.Context) (int64, error) { + f.get.Do(func() { + defer f.Close() + + err := f.blockUntilReady(ctx) + if err != nil { + f.e = err + return + } - f.BlockUntilReady() + var value C.int64_t + if err := C.fdb_future_get_int64(f.ptr, &value); err != 0 { + f.e = Error{int(err)} + return + } - var ver C.int64_t - if err := C.fdb_future_get_int64(f.ptr, &ver); err != 0 { - return 0, Error{int(err)} - } + f.v = int64(value) + }) - return int64(ver), nil + return f.v, f.e } -func (f *futureInt64) MustGet() int64 { - val, err := f.Get() +func (f *futureInt64) MustGet(ctx context.Context) int64 { + val, err := f.Get(ctx) if err != nil { panic(err) } @@ -421,43 +454,52 @@ type FutureStringSlice interface { // Get returns a slice of strings or an error if the asynchronous operation // associated with this future did not successfully complete. The current // goroutine will be blocked until the future is ready. - Get() ([]string, error) + Get(context.Context) ([]string, error) // MustGet returns a slice of strings or panics if the asynchronous // operation associated with this future did not successfully complete. The // current goroutine will be blocked until the future is ready. - MustGet() []string + MustGet(context.Context) []string Future } type futureStringSlice struct { *future + v []string + e error + get sync.Once } -func (f *futureStringSlice) Get() ([]string, error) { - defer runtime.KeepAlive(f.future) +func (f *futureStringSlice) Get(ctx context.Context) ([]string, error) { + f.get.Do(func() { + defer f.Close() - f.BlockUntilReady() - - var strings **C.char - var count C.int + err := f.blockUntilReady(ctx) + if err != nil { + f.e = err + return + } - if err := C.fdb_future_get_string_array(f.ptr, (***C.char)(unsafe.Pointer(&strings)), &count); err != 0 { - return nil, Error{int(err)} - } + var strings **C.char + var count C.int + if err := C.fdb_future_get_string_array(f.ptr, (***C.char)(unsafe.Pointer(&strings)), &count); err != 0 { + f.e = Error{int(err)} + return + } - ret := make([]string, int(count)) + f.v = make([]string, int(count)) - for i := 0; i < int(count); i++ { - ret[i] = C.GoString((*C.char)(*(**C.char)(unsafe.Pointer(uintptr(unsafe.Pointer(strings)) + uintptr(i*8))))) - } + for i := 0; i < int(count); i++ { + f.v[i] = C.GoString((*C.char)(*(**C.char)(unsafe.Pointer(uintptr(unsafe.Pointer(strings)) + uintptr(i*8))))) + } + }) - return ret, nil + return f.v, f.e } -func (f *futureStringSlice) MustGet() []string { - val, err := f.Get() +func (f *futureStringSlice) MustGet(ctx context.Context) []string { + val, err := f.Get(ctx) if err != nil { panic(err) } diff --git a/bindings/go/src/fdb/range.go b/bindings/go/src/fdb/range.go index fdb11ccb8b4..61aa4b672a0 100644 --- a/bindings/go/src/fdb/range.go +++ b/bindings/go/src/fdb/range.go @@ -23,6 +23,7 @@ package fdb import ( + "context" "fmt" ) @@ -124,18 +125,19 @@ type RangeResult struct { sr SelectorRange options RangeOptions snapshot bool - f *futureKeyValueArray } -// GetSliceWithError returns a slice of KeyValue objects satisfying the range +// Get returns a slice of KeyValue objects satisfying the range // specified in the read that returned this RangeResult, or an error if any of // the asynchronous operations associated with this result did not successfully // complete. The current goroutine will be blocked until all reads have // completed. -func (rr RangeResult) GetSliceWithError() ([]KeyValue, error) { +// Using Iterator() to streamline reads is preferable for efficiency reasons. +func (rr RangeResult) Get(ctx context.Context) ([]KeyValue, error) { var ret []KeyValue ri := rr.Iterator() + defer ri.Close() if rr.options.Limit != 0 { ri.options.Mode = StreamingModeExact @@ -143,25 +145,35 @@ func (rr RangeResult) GetSliceWithError() ([]KeyValue, error) { ri.options.Mode = StreamingModeWantAll } - for ri.Advance() { - if ri.err != nil { - return nil, ri.err + for { + // prefetch even if very little happens between iterations + kvs, err := ri.NextBatch(ctx, true) + if err != nil { + return nil, err + } + if len(kvs) == 0 { + break + } + + ret = append(ret, kvs...) + + if ri.options.Reverse { + ri.sr.End = FirstGreaterOrEqual(kvs[ri.lastBatchLen-1].Key) + } else { + ri.sr.Begin = FirstGreaterThan(kvs[ri.lastBatchLen-1].Key) } - ret = append(ret, ri.kvs...) - ri.index = len(ri.kvs) - ri.fetchNextBatch() } return ret, nil } -// GetSliceOrPanic returns a slice of KeyValue objects satisfying the range +// MustGet returns a slice of KeyValue objects satisfying the range // specified in the read that returned this RangeResult, or panics if any of the // asynchronous operations associated with this result did not successfully // complete. The current goroutine will be blocked until all reads have // completed. -func (rr RangeResult) GetSliceOrPanic() []KeyValue { - kvs, err := rr.GetSliceWithError() +func (rr RangeResult) MustGet(ctx context.Context) []KeyValue { + kvs, err := rr.Get(ctx) if err != nil { panic(err) } @@ -170,14 +182,15 @@ func (rr RangeResult) GetSliceOrPanic() []KeyValue { // Iterator returns a RangeIterator over the key-value pairs satisfying the // range specified in the read that returned this RangeResult. +// Close() must be called on this range iterator to avoid a memory leak. func (rr RangeResult) Iterator() *RangeIterator { return &RangeIterator{ t: rr.t, - f: rr.f, sr: rr.sr, options: rr.options, iteration: 1, snapshot: rr.snapshot, + more: true, } } @@ -185,104 +198,93 @@ func (rr RangeResult) Iterator() *RangeIterator { // objects) satisfying the range specified in a range read. RangeIterator is // constructed with the (RangeResult).Iterator method. // -// You must call Advance and get a true result prior to calling Get or MustGet. +// You must call NextBatch and get a result without error. // // RangeIterator should not be copied or used concurrently from multiple // goroutines, but multiple RangeIterators may be constructed from a single // RangeResult and used concurrently. RangeIterator should not be returned from // a transactional function passed to the Transact method of a Transactor. type RangeIterator struct { - t *transaction - f *futureKeyValueArray - sr SelectorRange - options RangeOptions - iteration int - done bool - more bool - kvs []KeyValue - index int - err error - snapshot bool + t *transaction + sr SelectorRange + options RangeOptions + iteration int + more bool + lastBatchLen int + snapshot bool + prefetchedBatch *futureKeyValueArray + prefetchOptions RangeOptions } -// Advance attempts to advance the iterator to the next key-value pair. Advance -// returns true if there are more key-value pairs satisfying the range, or false -// if the range has been exhausted. You must call this before every call to Get -// or MustGet. -func (ri *RangeIterator) Advance() bool { - if ri.done { - return false - } - - if ri.f == nil { - return true +// NextBatch advances the iterator to the next key-value pairs batch. +// It returns a slice of key-value pairs satisfying the range, an empty slice +// if the range has been exhausted, or an error if any of the asynchronous +// operations associated with this result did not successfully complete. +// If 'prefetch' is true, future for the next batch will be created on return and +// released when the RangeIterator is closed. +func (ri *RangeIterator) NextBatch(ctx context.Context, prefetch bool) ([]KeyValue, error) { + if !ri.more { + // iterator is done + return nil, nil } - ri.kvs, ri.more, ri.err = ri.f.Get() - ri.index = 0 - ri.f = nil + var f *futureKeyValueArray + if ri.prefetchedBatch != nil { + // update state for the prefetched read batch + ri.options = ri.prefetchOptions + ri.iteration++ - if ri.err != nil || len(ri.kvs) > 0 { - return true + f = ri.prefetchedBatch + ri.prefetchedBatch = nil + } else { + f = ri.fetchNextBatch() } - return false -} - -func (ri *RangeIterator) fetchNextBatch() { - if !ri.more || ri.index == ri.options.Limit { - ri.done = true - return + var err error + var kvs []KeyValue + kvs, ri.more, err = f.Get(ctx) + ri.lastBatchLen = len(kvs) + f.Close() + if err != nil { + // cannot re-use iterator after an error + ri.more = false + return nil, err } - if ri.options.Limit > 0 { - // Not worried about this being zero, checked equality above - ri.options.Limit -= ri.index + // if limit is zero then we can rely on the 'more' value returned by the C function + if ri.more && ri.options.Limit != 0 { + // if limit is non-zero then keep reading until Limit is zero + ri.more = ri.lastBatchLen < ri.options.Limit } - if ri.options.Reverse { - ri.sr.End = FirstGreaterOrEqual(ri.kvs[ri.index-1].Key) - } else { - ri.sr.Begin = FirstGreaterThan(ri.kvs[ri.index-1].Key) + if prefetch && ri.more { + ri.prefetchOptions = ri.options + if ri.prefetchOptions.Limit > 0 { + ri.prefetchOptions.Limit -= ri.lastBatchLen + } + f := ri.t.doGetRange(ri.sr, ri.prefetchOptions, ri.snapshot, ri.iteration+1) + ri.prefetchedBatch = &f } - ri.iteration++ - - f := ri.t.doGetRange(ri.sr, ri.options, ri.snapshot, ri.iteration) - ri.f = &f + return kvs, nil } -// Get returns the next KeyValue in a range read, or an error if one of the -// asynchronous operations associated with this range did not successfully -// complete. The Advance method of this RangeIterator must have returned true -// prior to calling Get. -func (ri *RangeIterator) Get() (kv KeyValue, err error) { - if ri.err != nil { - err = ri.err - return +func (ri *RangeIterator) fetchNextBatch() *futureKeyValueArray { + if ri.options.Limit > 0 { + // Not worried about this being zero, equality was checked by caller + ri.options.Limit -= ri.lastBatchLen } - kv = ri.kvs[ri.index] - - ri.index++ - - if ri.index == len(ri.kvs) { - ri.fetchNextBatch() - } + ri.iteration++ - return + f := ri.t.doGetRange(ri.sr, ri.options, ri.snapshot, ri.iteration) + return &f } -// MustGet returns the next KeyValue in a range read, or panics if one of the -// asynchronous operations associated with this range did not successfully -// complete. The Advance method of this RangeIterator must have returned true -// prior to calling MustGet. -func (ri *RangeIterator) MustGet() KeyValue { - kv, err := ri.Get() - if err != nil { - panic(err) +func (ri *RangeIterator) Close() { + if ri.prefetchedBatch != nil { + ri.prefetchedBatch.Close() } - return kv } // Strinc returns the first key that would sort outside the range prefixed by diff --git a/bindings/go/src/fdb/snapshot.go b/bindings/go/src/fdb/snapshot.go index 4a2d7095030..62f59790970 100644 --- a/bindings/go/src/fdb/snapshot.go +++ b/bindings/go/src/fdb/snapshot.go @@ -22,6 +22,8 @@ package fdb +import "context" + // Snapshot is a handle to a FoundationDB transaction snapshot, suitable for // performing snapshot reads. Snapshot reads offer a more relaxed isolation // level than FoundationDB's default serializable isolation, reducing @@ -33,6 +35,8 @@ type Snapshot struct { *transaction } +var noOpFunc = func() {} + // ReadTransact executes the caller-provided function, passing it the Snapshot // receiver object (as a ReadTransaction). // @@ -46,10 +50,17 @@ type Snapshot struct { // // See the ReadTransactor interface for an example of using ReadTransact with // Transaction, Snapshot and Database objects. -func (s Snapshot) ReadTransact(f func(ReadTransaction) (interface{}, error)) (r interface{}, err error) { +func (s Snapshot) ReadTransact(ctx context.Context, f func(ReadTransaction) (interface{}, error)) (r interface{}, txClose func(), err error) { defer panicToError(&err) + // NOTE: 'autoCancel' must be called outside of the defer function, so that the goroutine is started + ch := autoCancel(ctx, s) + defer close(ch) + r, err = f(s) + if err == nil { + txClose = noOpFunc + } return } @@ -66,11 +77,13 @@ func (s Snapshot) Snapshot() Snapshot { } // Get is equivalent to (Transaction).Get, performed as a snapshot read. +// Close() must be called on the returned future to avoid a memory leak. func (s Snapshot) Get(key KeyConvertible) FutureByteSlice { return s.get(key.FDBKey(), 1) } // GetKey is equivalent to (Transaction).GetKey, performed as a snapshot read. +// Close() must be called on the returned future to avoid a memory leak. func (s Snapshot) GetKey(sel Selectable) FutureKey { return s.getKey(sel.FDBKeySelector(), 1) } @@ -83,6 +96,7 @@ func (s Snapshot) GetRange(r Range, options RangeOptions) RangeResult { // GetReadVersion is equivalent to (Transaction).GetReadVersion, performed as // a snapshot read. +// Close() must be called on the returned future to avoid a memory leak. func (s Snapshot) GetReadVersion() FutureInt64 { return s.getReadVersion() } @@ -95,6 +109,7 @@ func (s Snapshot) GetDatabase() Database { // GetEstimatedRangeSizeBytes returns an estimate for the number of bytes // stored in the given range. +// Close() must be called on the returned future to avoid a memory leak. func (s Snapshot) GetEstimatedRangeSizeBytes(r ExactRange) FutureInt64 { beginKey, endKey := r.FDBRangeKeys() return s.getEstimatedRangeSizeBytes( diff --git a/bindings/go/src/fdb/transaction.go b/bindings/go/src/fdb/transaction.go index 3fd0ab10fe4..b8bbacef0c3 100644 --- a/bindings/go/src/fdb/transaction.go +++ b/bindings/go/src/fdb/transaction.go @@ -25,6 +25,7 @@ package fdb // #define FDB_API_VERSION 740 // #include import "C" +import "context" // A ReadTransaction can asynchronously read from a FoundationDB // database. Transaction and Snapshot both satisfy the ReadTransaction @@ -42,11 +43,16 @@ type ReadTransaction interface { GetEstimatedRangeSizeBytes(r ExactRange) FutureInt64 GetRangeSplitPoints(r ExactRange, chunkSize int64) FutureKeyArray Options() TransactionOptions - Cancel() + + CancellableTransaction ReadTransactor } +type CancellableTransaction interface { + Cancel() +} + // Transaction is a handle to a FoundationDB transaction. Transaction is a // lightweight object that may be efficiently copied, and is safe for concurrent // use by multiple goroutines. @@ -88,7 +94,9 @@ func (opt TransactionOptions) setOpt(code int, param []byte) error { }, param) } -func (t *transaction) destroy() { +// Close will destroy the underlying transaction. +// It must be called after all the transaction-associated futures have been closed. +func (t *transaction) Close() { C.fdb_transaction_destroy(t.ptr) } @@ -116,10 +124,17 @@ func (t Transaction) GetDatabase() Database { // // See the Transactor interface for an example of using Transact with // Transaction and Database objects. -func (t Transaction) Transact(f func(Transaction) (interface{}, error)) (r interface{}, err error) { +func (t Transaction) Transact(ctx context.Context, f func(Transaction) (interface{}, error)) (r interface{}, txClose func(), err error) { defer panicToError(&err) + // NOTE: 'autoCancel' must be called outside of the defer function, so that the goroutine is started + ch := autoCancel(ctx, t) + defer close(ch) + r, err = f(t) + if err == nil { + txClose = noOpFunc + } return } @@ -136,10 +151,17 @@ func (t Transaction) Transact(f func(Transaction) (interface{}, error)) (r inter // // See the ReadTransactor interface for an example of using ReadTransact with // Transaction, Snapshot and Database objects. -func (t Transaction) ReadTransact(f func(ReadTransaction) (interface{}, error)) (r interface{}, err error) { +func (t Transaction) ReadTransact(ctx context.Context, f func(ReadTransaction) (interface{}, error)) (r interface{}, txClose func(), err error) { defer panicToError(&err) + // NOTE: 'autoCancel' must be called outside of the defer function, so that the goroutine is started + ch := autoCancel(ctx, t) + defer close(ch) + r, err = f(t) + if err == nil { + txClose = noOpFunc + } return } @@ -189,9 +211,10 @@ func (t Transaction) Snapshot() Snapshot { // // Typical code will not use OnError directly. (Database).Transact uses // OnError internally to implement a correct retry loop. +// Close() must be called on the returned future to avoid a memory leak. func (t Transaction) OnError(err Error) FutureNil { return &futureNil{ - future: newFuture(t.transaction, C.fdb_transaction_on_error(t.ptr, C.fdb_error_t(err.Code))), + future: newFuture(C.fdb_transaction_on_error(t.ptr, C.fdb_error_t(err.Code))), } } @@ -205,9 +228,10 @@ func (t Transaction) OnError(err Error) FutureNil { // be unable to determine whether a transaction succeeded. For more information, // see // https://apple.github.io/foundationdb/developer-guide.html#transactions-with-unknown-results. +// Close() must be called on the returned future to avoid a memory leak. func (t Transaction) Commit() FutureNil { return &futureNil{ - future: newFuture(t.transaction, C.fdb_transaction_commit(t.ptr)), + future: newFuture(C.fdb_transaction_commit(t.ptr)), } } @@ -240,16 +264,17 @@ func (t Transaction) Commit() FutureNil { // changed using SetMaxWatches on the Database. Because a watch outlives the // transaction that creates it, any watch that is no longer needed should be // cancelled by calling (FutureNil).Cancel on its returned future. +// Close() must be called on the returned future to avoid a memory leak. func (t Transaction) Watch(key KeyConvertible) FutureNil { kb := key.FDBKey() return &futureNil{ - future: newFuture(t.transaction, C.fdb_transaction_watch(t.ptr, byteSliceToPtr(kb), C.int(len(kb)))), + future: newFuture(C.fdb_transaction_watch(t.ptr, byteSliceToPtr(kb), C.int(len(kb)))), } } func (t *transaction) get(key []byte, snapshot int) FutureByteSlice { return &futureByteSlice{ - future: newFuture(t, C.fdb_transaction_get( + future: newFuture(C.fdb_transaction_get( t.ptr, byteSliceToPtr(key), C.int(len(key)), @@ -261,6 +286,7 @@ func (t *transaction) get(key []byte, snapshot int) FutureByteSlice { // Get returns the (future) value associated with the specified key. The read is // performed asynchronously and does not block the calling goroutine. The future // will become ready when the read is complete. +// Close() must be called on the returned future to avoid a memory leak. func (t Transaction) Get(key KeyConvertible) FutureByteSlice { return t.get(key.FDBKey(), 0) } @@ -273,7 +299,7 @@ func (t *transaction) doGetRange(r Range, options RangeOptions, snapshot bool, i ekey := esel.Key.FDBKey() return futureKeyValueArray{ - future: newFuture(t, C.fdb_transaction_get_range( + future: newFuture(C.fdb_transaction_get_range( t.ptr, byteSliceToPtr(bkey), C.int(len(bkey)), @@ -293,14 +319,12 @@ func (t *transaction) doGetRange(r Range, options RangeOptions, snapshot bool, i } func (t *transaction) getRange(r Range, options RangeOptions, snapshot bool) RangeResult { - f := t.doGetRange(r, options, snapshot, 1) begin, end := r.FDBRangeKeySelectors() return RangeResult{ t: t, sr: SelectorRange{begin, end}, options: options, snapshot: snapshot, - f: &f, } } @@ -315,7 +339,7 @@ func (t Transaction) GetRange(r Range, options RangeOptions) RangeResult { func (t *transaction) getEstimatedRangeSizeBytes(beginKey Key, endKey Key) FutureInt64 { return &futureInt64{ - future: newFuture(t, C.fdb_transaction_get_estimated_range_size_bytes( + future: newFuture(C.fdb_transaction_get_estimated_range_size_bytes( t.ptr, byteSliceToPtr(beginKey), C.int(len(beginKey)), @@ -333,6 +357,7 @@ func (t *transaction) getEstimatedRangeSizeBytes(beginKey Key, endKey Key) Futur // that reason it is recommended to use this API to query against large ranges for accuracy considerations. // For a rough reference, if the returned size is larger than 3MB, one can consider the size to be // accurate. +// Close() must be called on the returned future to avoid a memory leak. func (t Transaction) GetEstimatedRangeSizeBytes(r ExactRange) FutureInt64 { beginKey, endKey := r.FDBRangeKeys() return t.getEstimatedRangeSizeBytes( @@ -343,7 +368,7 @@ func (t Transaction) GetEstimatedRangeSizeBytes(r ExactRange) FutureInt64 { func (t *transaction) getRangeSplitPoints(beginKey Key, endKey Key, chunkSize int64) FutureKeyArray { return &futureKeyArray{ - future: newFuture(t, C.fdb_transaction_get_range_split_points( + future: newFuture(C.fdb_transaction_get_range_split_points( t.ptr, byteSliceToPtr(beginKey), C.int(len(beginKey)), @@ -357,6 +382,7 @@ func (t *transaction) getRangeSplitPoints(beginKey Key, endKey Key, chunkSize in // GetRangeSplitPoints returns a list of keys that can split the given range // into (roughly) equally sized chunks based on chunkSize. // Note: the returned split points contain the start key and end key of the given range. +// Close() must be called on the returned future to avoid a memory leak. func (t Transaction) GetRangeSplitPoints(r ExactRange, chunkSize int64) FutureKeyArray { beginKey, endKey := r.FDBRangeKeys() return t.getRangeSplitPoints( @@ -368,13 +394,14 @@ func (t Transaction) GetRangeSplitPoints(r ExactRange, chunkSize int64) FutureKe func (t *transaction) getReadVersion() FutureInt64 { return &futureInt64{ - future: newFuture(t, C.fdb_transaction_get_read_version(t.ptr)), + future: newFuture(C.fdb_transaction_get_read_version(t.ptr)), } } // (Infrequently used) GetReadVersion returns the (future) transaction read version. The read is // performed asynchronously and does not block the calling goroutine. The future // will become ready when the read version is available. +// Close() must be called on the returned future to avoid a memory leak. func (t Transaction) GetReadVersion() FutureInt64 { return t.getReadVersion() } @@ -434,19 +461,21 @@ func (t Transaction) GetCommittedVersion() (int64, error) { // committed and will result in the future completing with an error. Keep in // mind that a transaction which reads keys and then sets them to their current // values may be optimized to a read-only transaction. +// Close() must be called on the returned future to avoid a memory leak. func (t Transaction) GetVersionstamp() FutureKey { - return &futureKey{future: newFuture(t.transaction, C.fdb_transaction_get_versionstamp(t.ptr))} + return &futureKey{future: newFuture(C.fdb_transaction_get_versionstamp(t.ptr))} } func (t *transaction) getApproximateSize() FutureInt64 { return &futureInt64{ - future: newFuture(t, C.fdb_transaction_get_approximate_size(t.ptr)), + future: newFuture(C.fdb_transaction_get_approximate_size(t.ptr)), } } // Returns a future that is the approximate transaction size so far in this // transaction, which is the summation of the estimated size of mutations, // read conflict ranges, and write conflict ranges. +// Close() must be called on the returned future to avoid a memory leak. func (t Transaction) GetApproximateSize() FutureInt64 { return t.getApproximateSize() } @@ -468,7 +497,7 @@ func boolToInt(b bool) int { func (t *transaction) getKey(sel KeySelector, snapshot int) FutureKey { key := sel.Key.FDBKey() return &futureKey{ - future: newFuture(t, C.fdb_transaction_get_key( + future: newFuture(C.fdb_transaction_get_key( t.ptr, byteSliceToPtr(key), C.int(len(key)), @@ -488,6 +517,7 @@ func (t *transaction) getKey(sel KeySelector, snapshot int) FutureKey { // retrieved, using network bandwidth. Invoking // (TransactionOptions).SetReadYourWritesDisable will avoid both the caching and // the increased network bandwidth. +// Close() must be called on the returned future to avoid a memory leak. func (t Transaction) GetKey(sel Selectable) FutureKey { return t.getKey(sel.FDBKeySelector(), 0) } @@ -587,7 +617,7 @@ func (t Transaction) Options() TransactionOptions { func localityGetAddressesForKey(t *transaction, key KeyConvertible) FutureStringSlice { kb := key.FDBKey() return &futureStringSlice{ - future: newFuture(t, C.fdb_transaction_get_addresses_for_key( + future: newFuture(C.fdb_transaction_get_addresses_for_key( t.ptr, byteSliceToPtr(kb), C.int(len(kb)), @@ -599,6 +629,7 @@ func localityGetAddressesForKey(t *transaction, key KeyConvertible) FutureString // each of the storage servers responsible for storing key and its associated // value. The read is performed asynchronously and does not block the calling // goroutine. The future will become ready when the read is complete. +// Close() must be called on the returned future to avoid a memory leak. func (t Transaction) LocalityGetAddressesForKey(key KeyConvertible) FutureStringSlice { return localityGetAddressesForKey(t.transaction, key) }