-
Notifications
You must be signed in to change notification settings - Fork 17
/
stm.go
348 lines (312 loc) · 9.16 KB
/
stm.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
package gotomic
import (
"bytes"
"fmt"
"sort"
"sync/atomic"
"unsafe"
)
const (
undecided = iota
read_check
successful
failed
)
var lastCommit uint64 = 0
var lastBegin uint64 = 0
/*
Clonable types can be handled by the transaction layer.
*/
type Clonable interface {
Clone() Clonable
}
/*
Handle wraps any type of data that is supposed to be handled by the transaction layer.
*/
type Handle struct {
/*
Will point to a version.
*/
Pointer unsafe.Pointer
}
/*
NewHandle will wrap a Clonable value to enable its use in the transaction layer.
*/
func NewHandle(c Clonable) *Handle {
return &Handle{unsafe.Pointer(&version{0, nil, c})}
}
/*
Current returns the current content of this Handle, disregarding any transactional state.
*/
func (self *Handle) Current() Clonable {
return self.getVersion().content
}
func (self *Handle) getVersion() *version {
return (*version)(atomic.LoadPointer(&self.Pointer))
}
func (self *Handle) replace(old, neu *version) bool {
return atomic.CompareAndSwapPointer(&self.Pointer, unsafe.Pointer(old), unsafe.Pointer(neu))
}
type version struct {
/*
The number of the transaction that created this version.
*/
commitNumber uint64
/*
The transaction (or nil) having locked this version.
*/
lockedBy *Transaction
/*
The content in this version.
*/
content Clonable
}
func (self *version) clone() *version {
return &version{atomic.LoadUint64(&self.commitNumber), nil, self.content.Clone()}
}
type snapshot struct {
old *version
neu *version
}
type write struct {
handle *Handle
snapshot *snapshot
}
type writes []write
func (self writes) Len() int {
return len(self)
}
func (self writes) Swap(i, j int) {
self[i], self[j] = self[j], self[i]
}
func (self writes) Less(i, j int) bool {
return uintptr(unsafe.Pointer(self[i].handle)) < uintptr(unsafe.Pointer(self[j].handle))
}
/*
Transaction is based on "Concurrent Programming Without Locks" by Keir Fraser and Tim Harris <http://www.cl.cam.ac.uk/research/srg/netos/papers/2007-cpwl.pdf>
It has a few tweaks that I don't believe break it (but I haven't even tried proving it):
1) It has an ever increasing counter for the last transaction to commit.
It uses this counter to fail transactions fast when they try to read a value that another
transaction has changed since the first transaction began.
2) It copies the data not only on write opening, but also on read opening.
These changes will make the transactions act more along the lines of "Sandboxing Transactional Memory" by Luke Dalessandro and Michael L. Scott <http://www.cs.rochester.edu/u/scott/papers/2012_TRANSACT_sandboxing.pdf> and will hopefully avoid the need to kill transactions exhibiting invalid behaviour due to inconsistent states.
*/
type Transaction struct {
/*
Steadily incrementing number for each committed transaction.
*/
beginNumber uint64
commitNumber uint64
status int32
readHandles map[*Handle]*snapshot
writeHandles map[*Handle]*snapshot
sortedWrites writes
}
func NewTransaction() *Transaction {
return &Transaction{
atomic.AddUint64(&lastBegin, 1),
atomic.LoadUint64(&lastCommit),
undecided,
make(map[*Handle]*snapshot),
make(map[*Handle]*snapshot),
nil,
}
}
func (self *Transaction) getStatus() int32 {
return atomic.LoadInt32(&self.status)
}
func (self *Transaction) objRead(h *Handle) (rval *version, err error) {
version := h.getVersion()
other := version.lockedBy
if other != nil {
if other.getStatus() == read_check && self.getStatus() == read_check && self.beginNumber < other.beginNumber {
other.Abort()
} else {
other.commit()
}
if other.getStatus() == successful {
version = other.writeHandles[h].neu
atomic.StoreUint64(&version.commitNumber, atomic.LoadUint64(&other.commitNumber))
}
}
if atomic.LoadUint64(&version.commitNumber) > atomic.LoadUint64(&self.commitNumber) {
err = fmt.Errorf("%v has changed", version.content)
} else {
rval = version
}
return
}
/*
sortWrites will put all writeHandles in sortedWrites and remove writeHandles.
_not_ safe to call multiple times!
*/
func (self *Transaction) sortWrites() {
for handle, snapshot := range self.writeHandles {
self.sortedWrites = append(self.sortedWrites, write{handle, snapshot})
}
sort.Sort(self.sortedWrites)
}
func (self *Transaction) release() {
stat := self.getStatus()
for _, w := range self.sortedWrites {
current := w.handle.getVersion()
if current.lockedBy == self {
wanted := w.snapshot.old
if stat == successful {
wanted = w.snapshot.neu
atomic.StoreUint64(&wanted.commitNumber, atomic.LoadUint64(&self.commitNumber))
}
w.handle.replace(current, wanted)
}
}
}
func (self *Transaction) acquire() bool {
for _, w := range self.sortedWrites {
for {
lockedVersion := w.snapshot.old.clone()
lockedVersion.lockedBy = self
if w.handle.replace(w.snapshot.old, lockedVersion) {
break
}
current := w.handle.getVersion()
if current.lockedBy == self {
break
}
switch status := self.getStatus(); status {
case read_check:
return true
case successful:
return true
case failed:
return false
}
if current.lockedBy == nil {
return false
}
current.lockedBy.commit()
}
}
return true
}
func (self *Transaction) readCheck() bool {
for handle, snapshot := range self.readHandles {
if handle.getVersion() != snapshot.old {
if self.getStatus() == successful {
return true
}
return false
}
}
return true
}
/*
commit the transaction without mutating anything inside it except with atomic
methods. Useful for other helpful Transactions.
Safe to call multiple times (and it _will_ be if we have contention).
*/
func (self *Transaction) commit() bool {
if !self.acquire() {
self.Abort()
return false
}
defer self.release()
if atomic.CompareAndSwapInt32(&self.status, undecided, read_check) {
atomic.StoreUint64(&self.commitNumber, atomic.AddUint64(&lastCommit, 1))
}
if !self.readCheck() {
self.Abort()
return false
}
atomic.CompareAndSwapInt32(&self.status, read_check, successful)
return self.getStatus() == successful
}
/*
Commit the transaction. Will return whether the commit was successful or not.
Safe to call multiple times, but only from one thread.
*/
func (self *Transaction) Commit() bool {
status := self.getStatus()
if status == undecided {
self.sortWrites()
return self.commit()
} else if status == failed {
return false
} else if status == successful {
return true
} else if status == read_check {
return self.commit()
}
panic(fmt.Errorf("%#v has illegal state!"))
}
/*
Abort the transaction unless it is already successful.
Safe to call multiple times.
Unless the transaction is half-committed Abort isn't really necessary, the gc will clean it up properly.
*/
func (self *Transaction) Abort() {
stat := self.getStatus()
for stat != successful && stat != failed {
atomic.CompareAndSwapInt32(&self.status, stat, failed)
stat = self.getStatus()
}
self.release()
}
func (self *Transaction) Describe() string {
buf := bytes.NewBufferString(fmt.Sprintf("Transaction:%p (beginNumber: %v, commitNumber: %v):\n readHandles:\n", self, self.beginNumber, self.commitNumber))
for _, snapshot := range self.readHandles {
fmt.Fprintf(buf, " %v (%v) => %v\n", snapshot.old.content, snapshot.old.commitNumber, snapshot.neu.content)
}
fmt.Fprint(buf, " writeHandles:\n")
for _, snapshot := range self.writeHandles {
fmt.Fprintf(buf, " %v (%v) => %v\n", snapshot.old.content, snapshot.old.commitNumber, snapshot.neu.content)
}
return string(buf.Bytes())
}
/*
Read will return a version of the data in h that is guaranteed to not have been changed since this Transaction started.
Any changes made to the return value will *not* be saved when the Transaction commits.
If another Transaction changes the data in h before this Transaction commits the commit will fail.
*/
func (self *Transaction) Read(h *Handle) (rval Clonable, err error) {
if self.getStatus() != undecided {
return nil, fmt.Errorf("%v is not undecided", self)
}
if snapshot, ok := self.readHandles[h]; ok {
return snapshot.neu.content, nil
}
if snapshot, ok := self.writeHandles[h]; ok {
return snapshot.neu.content, nil
}
oldVersion, err := self.objRead(h)
if err != nil {
return nil, err
}
newVersion := oldVersion.clone()
self.readHandles[h] = &snapshot{oldVersion, newVersion}
return newVersion.content, nil
}
/*
Write will return a version of the data in h that is guaranteed to not have been changed since this Transaction started.
All changes made to the return value *will* be saved when the Transaction commits.
If another Transaction changes the data in h before this Transaction commits the commit will fail.
*/
func (self *Transaction) Write(h *Handle) (rval Clonable, err error) {
if self.getStatus() != undecided {
return nil, fmt.Errorf("%v is not undecided", self)
}
if snapshot, ok := self.writeHandles[h]; ok {
return snapshot.neu.content, nil
}
if snapshot, ok := self.readHandles[h]; ok {
delete(self.readHandles, h)
self.writeHandles[h] = snapshot
return snapshot.neu.content, nil
}
oldVersion, err := self.objRead(h)
if err != nil {
return nil, err
}
newVersion := oldVersion.clone()
self.writeHandles[h] = &snapshot{oldVersion, newVersion}
return newVersion.content, nil
}