Skip to content

Commit

Permalink
fix: ring buffer deadlock (#511)
Browse files Browse the repository at this point in the history
Fix an issue where the ring buffer might deadlock because rwlock and
iterator mutex are acquired in a different order.

Fixes: #508

Notes:

1. As discussed with Harry and Ben, `rb.Write` isn't adjusted because
the rwlock and iterator mutex are acquired at different times.
2. Besides an internal `close` function, an internal `positions`
function is created for the same reason.
3. The [test case
here](#508 (comment))
is not added because it was supposed to be an easy way to reproduce the
deadlock. Adding an additional benchmark test is not useful, and the
deadlock issue can be caught by the `TestDeadlock` test case anyway.
4. All three functions `signalIterators`, `releaseIterators` and
`removeIterator` are adjusted (although technical only
`releaseIterators` needed to be ) because they share a common naming
pattern so it's nicer to share a common implementation pattern as well:
all internal functions don't operate locks.

Benchmark test, manual test to reproduce the 3-way deadlock issue, and
race test are all done and passed.
  • Loading branch information
IronCore864 authored Oct 18, 2024
1 parent c3cb594 commit ae91836
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 11 deletions.
2 changes: 2 additions & 0 deletions internals/servicelog/iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,8 @@ func (it *iterator) Close() error {
if it.rb == nil {
return nil
}
it.rb.iteratorMutex.Lock()
defer it.rb.iteratorMutex.Unlock()
it.rb.removeIterator(it)
close(it.nextChan)
it.rb = nil
Expand Down
22 changes: 11 additions & 11 deletions internals/servicelog/ringbuffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@ func NewRingBuffer(size int) *RingBuffer {
func (rb *RingBuffer) Close() error {
rb.rwlock.Lock()
defer rb.rwlock.Unlock()
rb.iteratorMutex.Lock()
defer rb.iteratorMutex.Unlock()
if rb.writeClosed {
return nil
}
Expand All @@ -86,6 +88,8 @@ func (rb *RingBuffer) Write(p []byte) (written int, err error) {
}
defer func() {
if written > 0 {
rb.iteratorMutex.RLock()
defer rb.iteratorMutex.RUnlock()
rb.signalIterators()
}
}()
Expand Down Expand Up @@ -237,16 +241,18 @@ func (rb *RingBuffer) WriteTo(writer io.Writer, start RingPos) (next RingPos, n

// TailIterator returns an iterator from the tail of the buffer.
func (rb *RingBuffer) TailIterator() Iterator {
rb.rwlock.RLock()
defer rb.rwlock.RUnlock()
rb.iteratorMutex.Lock()
defer rb.iteratorMutex.Unlock()
start, _ := rb.Positions()
start := rb.readIndex
iter := &iterator{
rb: rb,
index: start,
nextChan: make(chan bool, 1),
closeChan: make(chan struct{}),
}
if rb.Closed() {
if rb.writeClosed {
close(iter.closeChan)
}
rb.iteratorList = append(rb.iteratorList, iter)
Expand All @@ -257,6 +263,8 @@ func (rb *RingBuffer) TailIterator() Iterator {
// If lines is greater than zero, the iterator will start that many lines
// backwards from the head.
func (rb *RingBuffer) HeadIterator(lines int) Iterator {
rb.rwlock.RLock()
defer rb.rwlock.RUnlock()
firstLine := rb.reverseLinePosition(lines)
rb.iteratorMutex.Lock()
defer rb.iteratorMutex.Unlock()
Expand All @@ -266,16 +274,14 @@ func (rb *RingBuffer) HeadIterator(lines int) Iterator {
nextChan: make(chan bool, 1),
closeChan: make(chan struct{}),
}
if rb.Closed() {
if rb.writeClosed {
close(iter.closeChan)
}
rb.iteratorList = append(rb.iteratorList, iter)
return iter
}

func (rb *RingBuffer) reverseLinePosition(n int) RingPos {
rb.rwlock.RLock()
defer rb.rwlock.RUnlock()
if n <= 0 {
return rb.writeIndex
}
Expand Down Expand Up @@ -322,8 +328,6 @@ func (rb *RingBuffer) discard(n int) error {
}

func (rb *RingBuffer) signalIterators() {
rb.iteratorMutex.RLock()
defer rb.iteratorMutex.RUnlock()
for _, iter := range rb.iteratorList {
select {
case iter.nextChan <- true:
Expand All @@ -339,8 +343,6 @@ func (rb *RingBuffer) signalIterators() {
}

func (rb *RingBuffer) releaseIterators() {
rb.iteratorMutex.Lock()
defer rb.iteratorMutex.Unlock()
for _, iter := range rb.iteratorList {
// Close closeChan if not already closed
select {
Expand All @@ -353,8 +355,6 @@ func (rb *RingBuffer) releaseIterators() {
}

func (rb *RingBuffer) removeIterator(iter *iterator) {
rb.iteratorMutex.Lock()
defer rb.iteratorMutex.Unlock()
for i, storedIter := range rb.iteratorList {
if iter != storedIter {
continue
Expand Down

0 comments on commit ae91836

Please sign in to comment.