-
Notifications
You must be signed in to change notification settings - Fork 2
/
taglocker.go
283 lines (240 loc) · 7.89 KB
/
taglocker.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
package plc
import (
"fmt"
"sync"
"github.com/dijkstracula/go-ilock"
)
// TagLockerOperationError is returned when one or more atomic operations on
// the tag locker in a "serialized" operation have failed. Since a "transaction"
// on the tag tree can yield multiple errors, a TagLockerFailure is the
// composition of one or more individual errors.
type TagLockerOperationError error
func lockError(e error) TagLockerOperationError {
return fmt.Errorf("Error locking tag locker: %v", e)
}
func unlockError(e error) TagLockerOperationError {
return fmt.Errorf("Error unlocking tag locker: %v", e)
}
func rLockError(e error) TagLockerOperationError {
return fmt.Errorf("Error read-locking tag locker: %v", e)
}
func rUnlockError(e error) TagLockerOperationError {
return fmt.Errorf("Error read-unlocking tag locker: %v", e)
}
// TagLocker is a plc.ReadWriter that wraps another ReadWriter, but gates
// concurrent accesses on grabbing read or write access on a tree of locks
// representing tag names (in the case of tree leaf nodes) and prefixes of tag
// names (in the case of tree frond nodes).
type TagLocker struct {
downstream ReadWriter
tagTree *tagLockerNode
}
// ReadTag reads the given tag name from the downstream ReadWriter. If another
// thread is concurrently writing to this tag or a prefix of the tag, we will
// block until that thread has released its access.
func (tl *TagLocker) ReadTag(name string, value interface{}) (err error) {
components, err := ParseQualifiedTagName(name)
if err != nil {
return
}
err = tl.tagTree.rLock(components)
if err != nil {
err = rLockError(err)
return
}
defer func() {
unlockErr := tl.tagTree.rUnlock(components)
if unlockErr != nil {
err = rUnlockError(unlockErr)
return
}
}()
err = tl.downstream.ReadTag(name, value)
if err != nil {
return
}
return
}
// WriteTag writes the given tag value to the downstream ReadWriter. Will block
// if another thread is reading this tag or a prefix of the tag.
func (tl *TagLocker) WriteTag(name string, value interface{}) (err error) {
components, err := ParseQualifiedTagName(name)
if err != nil {
return
}
err = tl.tagTree.lock(components)
if err != nil {
err = lockError(err)
return
}
defer func() {
unlockErr := tl.tagTree.unlock(components)
if unlockErr != nil {
err = unlockError(unlockErr)
return
}
}()
err = tl.downstream.WriteTag(name, value)
if err != nil {
return
}
return
}
type tagLockerNode struct {
mtx sync.RWMutex // Ensures mutual exclusion on the fields of the node.
tagLock *ilock.Mutex // The logical lock that mutator threads will hold while reading and writing tags.
component string // The component of the tag name.
children map[string]*tagLockerNode // All descendents of this node.
}
// NewTagLocker produces a new TagLocker.
func NewTagLocker(downstream ReadWriter) *TagLocker {
return &TagLocker{
downstream: downstream,
tagTree: newNode("/"),
}
}
func newNode(component string) *tagLockerNode {
return &tagLockerNode{
tagLock: ilock.New(),
mtx: sync.RWMutex{},
component: component,
children: make(map[string]*tagLockerNode),
}
}
// getOrCreate atomically returns the child of `tn` with the supplied
// component name; or, creates and inserts a child with that name. In
// either case, the child in question is returned.
// Assumes that tn.mtx is _not_ held!
func (tn *tagLockerNode) getOrCreateChild(component string) *tagLockerNode {
tn.mtx.RLock()
child, ok := tn.children[component]
// Do we already have a child component with the current component
// name? If so, just recurse on that.
if ok {
tn.mtx.RUnlock()
return child
}
// The child does not exist. Upgrade to a writer lock in order
// to insert a new child into our set of children.
tn.mtx.RUnlock()
tn.mtx.Lock()
// Check again to see that nobody beat us to creating that child.
child, ok = tn.children[component]
if ok {
// Lucky us!
tn.mtx.Unlock()
return child
}
// Okay, we have no choice but to create the child ourselves.
child = newNode(component)
tn.children[component] = child
// Release our own lock and recurse on the child.
tn.mtx.Unlock()
return child
}
// lock traverses the slice of components, setting intention writer
// locks along the branch of the lock tree, until it reaches the final
// tag component. There, it grabs an exclusive writer lock on that node.
func (tn *tagLockerNode) lock(components []string) error {
// If we have no paths to traverse, lock ourselves!
if len(components) == 0 {
//fmt.Fprintf(os.Stderr, "XLock %v\n", tn.component)
tn.tagLock.XLock()
return nil
}
// Otherwise, we are only part of the way to the final component
// to lock. Take an intent read lock on this node.
//fmt.Fprintf(os.Stderr, "IXLock %v\n", tn.component)
tn.tagLock.IXLock()
currentComp := components[0]
remainingComp := components[1:]
return tn.getOrCreateChild(currentComp).lock(remainingComp)
}
// rLock traverses the slice of components, setting intention reader
// locks along the branch of the lock tree, until it reaches the final
// tag component. There, it grabs a reader lock on that node.
func (tn *tagLockerNode) rLock(components []string) error {
// If we have no paths to traverse, lock ourselves!
if len(components) == 0 {
//fmt.Fprintf(os.Stderr, "SLock %v\n", tn.component)
tn.tagLock.SLock()
return nil
}
// Otherwise, we are only part of the way to the final component
// to lock. Take an intent read lock on this node.
//fmt.Fprintf(os.Stderr, "ISLock %v\n", tn.component)
tn.tagLock.ISLock()
currentComp := components[0]
remainingComp := components[1:]
return tn.getOrCreateChild(currentComp).rLock(remainingComp)
}
// rUnlock unlocks a path that has already been previously
// locked for reader access. It decrements the reader count on
// the final component in the path and removes a read intention
// on all other paths.
func (tn *tagLockerNode) rUnlock(components []string) error {
// If we have no paths to traverse, unlock ourselves!
if len(components) == 0 {
//fmt.Fprintf(os.Stderr, "SUnLock %v\n", tn.component)
tn.tagLock.SUnlock()
return nil
}
currentComp := components[0]
remainingComp := components[1:]
defer func() {
//fmt.Fprintf(os.Stderr, "ISUnLock %v\n", tn.component)
tn.tagLock.ISUnlock()
}()
// Unlock our children first - we want to unlock in the opposite
// order that we acquired the locks.
tn.mtx.RLock()
child, ok := tn.children[currentComp]
tn.mtx.RUnlock()
if !ok {
return fmt.Errorf("missing component %v", currentComp)
}
err := child.rUnlock(remainingComp)
if err != nil {
// TODO: what is the right thing to do here? Should we
// attempt to unlock ourselves even if children failed to
// unlock correctly? Either doing so or not doing so seems
// dangerous.
return err
}
return nil
}
// rUnlock unlocks a path that has already been previously
// locked for writer access. It decrements the writer count on
// the final component in the path and removes a write intention
// on all other paths.
func (tn *tagLockerNode) unlock(components []string) error {
// If we have no paths to traverse, unlock ourselves!
if len(components) == 0 {
//fmt.Fprintf(os.Stderr, "XUnLock %v\n", tn.component)
tn.tagLock.XUnlock()
return nil
}
currentComp := components[0]
remainingComp := components[1:]
defer func() {
//fmt.Fprintf(os.Stderr, "IXUnLock %v\n", tn.component)
tn.tagLock.IXUnlock()
}()
// Unlock our children first - we want to unlock in the opposite
// order that we acquired the locks.
tn.mtx.RLock()
child, ok := tn.children[currentComp]
tn.mtx.RUnlock()
if !ok {
return fmt.Errorf("missing component %v", currentComp)
}
err := child.unlock(remainingComp)
if err != nil {
// TODO: what is the right thing to do here? Should we
// attempt to unlock ourselves even if children failed to
// unlock correctly? Either doing so or not doing so seems
// dangerous.
return err
}
return nil
}