Skip to content

Commit b68c700

Browse files
committed
Go: introduce context for Transact/ReadTransact
Use a goroutine to automatically cancel transaction when context has an error. Correctly close transactions by returning a function to caller.
1 parent 534b1fe commit b68c700

File tree

13 files changed

+326
-182
lines changed

13 files changed

+326
-182
lines changed

bindings/go/src/_stacktester/directory.go

Lines changed: 25 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ package main
2222

2323
import (
2424
"bytes"
25+
"context"
2526
"strings"
2627

2728
"github.com/apple/foundationdb/bindings/go/src/fdb"
@@ -105,7 +106,7 @@ var createOps = map[string]bool{
105106
"OPEN_SUBSPACE": true,
106107
}
107108

108-
func (de *DirectoryExtension) processOp(sm *StackMachine, op string, isDB bool, idx int, t fdb.Transactor, rt fdb.ReadTransactor) {
109+
func (de *DirectoryExtension) processOp(ctx context.Context, sm *StackMachine, op string, isDB bool, idx int, t fdb.Transactor, rt fdb.ReadTransactor) {
109110
defer func() {
110111
if r := recover(); r != nil {
111112
sm.store(idx, []byte("DIRECTORY_ERROR"))
@@ -142,7 +143,7 @@ func (de *DirectoryExtension) processOp(sm *StackMachine, op string, isDB bool,
142143
if l != nil {
143144
layer = l.([]byte)
144145
}
145-
d, err := de.cwd().CreateOrOpen(t, tupleToPath(tuples[0]), layer)
146+
d, err := de.cwd().CreateOrOpen(ctx, t, tupleToPath(tuples[0]), layer)
146147
if err != nil {
147148
panic(err)
148149
}
@@ -157,10 +158,10 @@ func (de *DirectoryExtension) processOp(sm *StackMachine, op string, isDB bool,
157158
p := sm.waitAndPop().item
158159
var d directory.Directory
159160
if p == nil {
160-
d, err = de.cwd().Create(t, tupleToPath(tuples[0]), layer)
161+
d, err = de.cwd().Create(ctx, t, tupleToPath(tuples[0]), layer)
161162
} else {
162163
// p.([]byte) itself may be nil, but CreatePrefix handles that appropriately
163-
d, err = de.cwd().CreatePrefix(t, tupleToPath(tuples[0]), layer, p.([]byte))
164+
d, err = de.cwd().CreatePrefix(ctx, t, tupleToPath(tuples[0]), layer, p.([]byte))
164165
}
165166
if err != nil {
166167
panic(err)
@@ -173,7 +174,7 @@ func (de *DirectoryExtension) processOp(sm *StackMachine, op string, isDB bool,
173174
if l != nil {
174175
layer = l.([]byte)
175176
}
176-
d, err := de.cwd().Open(rt, tupleToPath(tuples[0]), layer)
177+
d, err := de.cwd().Open(ctx, rt, tupleToPath(tuples[0]), layer)
177178
if err != nil {
178179
panic(err)
179180
}
@@ -188,14 +189,14 @@ func (de *DirectoryExtension) processOp(sm *StackMachine, op string, isDB bool,
188189
de.errorIndex = sm.waitAndPop().item.(int64)
189190
case op == "MOVE":
190191
tuples := sm.popTuples(2)
191-
d, err := de.cwd().Move(t, tupleToPath(tuples[0]), tupleToPath(tuples[1]))
192+
d, err := de.cwd().Move(ctx, t, tupleToPath(tuples[0]), tupleToPath(tuples[1]))
192193
if err != nil {
193194
panic(err)
194195
}
195196
de.store(d)
196197
case op == "MOVE_TO":
197198
tuples := sm.popTuples(1)
198-
d, err := de.cwd().MoveTo(t, tupleToPath(tuples[0]))
199+
d, err := de.cwd().MoveTo(ctx, t, tupleToPath(tuples[0]))
199200
if err != nil {
200201
panic(err)
201202
}
@@ -208,8 +209,8 @@ func (de *DirectoryExtension) processOp(sm *StackMachine, op string, isDB bool,
208209
// doesn't end up committing the version key. (Other languages have
209210
// separate remove() and remove_if_exists() so don't have this tricky
210211
// issue).
211-
_, err := t.Transact(func(tr fdb.Transaction) (interface{}, error) {
212-
ok, err := de.cwd().Remove(tr, path)
212+
_, txClose, err := t.Transact(ctx, func(tr fdb.Transaction) (interface{}, error) {
213+
ok, err := de.cwd().Remove(ctx, tr, path)
213214
if err != nil {
214215
panic(err)
215216
}
@@ -225,8 +226,9 @@ func (de *DirectoryExtension) processOp(sm *StackMachine, op string, isDB bool,
225226
if err != nil {
226227
panic(err)
227228
}
229+
txClose()
228230
case op == "LIST":
229-
subs, err := de.cwd().List(rt, sm.maybePath())
231+
subs, err := de.cwd().List(ctx, rt, sm.maybePath())
230232
if err != nil {
231233
panic(err)
232234
}
@@ -236,7 +238,7 @@ func (de *DirectoryExtension) processOp(sm *StackMachine, op string, isDB bool,
236238
}
237239
sm.store(idx, t.Pack())
238240
case op == "EXISTS":
239-
b, err := de.cwd().Exists(rt, sm.maybePath())
241+
b, err := de.cwd().Exists(ctx, rt, sm.maybePath())
240242
if err != nil {
241243
panic(err)
242244
}
@@ -275,10 +277,14 @@ func (de *DirectoryExtension) processOp(sm *StackMachine, op string, isDB bool,
275277
k := sm.waitAndPop().item.([]byte)
276278
k = append(k, tuple.Tuple{de.index}.Pack()...)
277279
v := de.css().Bytes()
278-
t.Transact(func(tr fdb.Transaction) (interface{}, error) {
280+
_, txClose, err := t.Transact(ctx, func(tr fdb.Transaction) (interface{}, error) {
279281
tr.Set(fdb.Key(k), v)
280282
return nil, nil
281283
})
284+
if err != nil {
285+
panic(err)
286+
}
287+
txClose()
282288
case op == "LOG_DIRECTORY":
283289
rp := sm.waitAndPop().item.([]byte)
284290
ss := subspace.FromBytes(rp).Sub(de.index)
@@ -288,7 +294,7 @@ func (de *DirectoryExtension) processOp(sm *StackMachine, op string, isDB bool,
288294
v2 := tuple.Tuple{de.cwd().GetLayer()}.Pack()
289295
k3 := ss.Pack(tuple.Tuple{"exists"})
290296
var v3 []byte
291-
exists, err := de.cwd().Exists(rt, nil)
297+
exists, err := de.cwd().Exists(ctx, rt, nil)
292298
if err != nil {
293299
panic(err)
294300
}
@@ -300,19 +306,23 @@ func (de *DirectoryExtension) processOp(sm *StackMachine, op string, isDB bool,
300306
k4 := ss.Pack(tuple.Tuple{"children"})
301307
var subs []string
302308
if exists {
303-
subs, err = de.cwd().List(rt, nil)
309+
subs, err = de.cwd().List(ctx, rt, nil)
304310
if err != nil {
305311
panic(err)
306312
}
307313
}
308314
v4 := tuplePackStrings(subs)
309-
t.Transact(func(tr fdb.Transaction) (interface{}, error) {
315+
_, txClose, err := t.Transact(ctx, func(tr fdb.Transaction) (interface{}, error) {
310316
tr.Set(k1, v1)
311317
tr.Set(k2, v2)
312318
tr.Set(k3, v3)
313319
tr.Set(k4, v4)
314320
return nil, nil
315321
})
322+
if err != nil {
323+
panic(err)
324+
}
325+
txClose()
316326
case op == "STRIP_PREFIX":
317327
ba := sm.waitAndPop().item.([]byte)
318328
ssb := de.css().Bytes()

0 commit comments

Comments
 (0)