Skip to content

Commit

Permalink
Merge pull request #329 from liftbridge-io/fixes
Browse files Browse the repository at this point in the history
Fix replication notify flood leading to high CPU usage
  • Loading branch information
tylertreat authored Feb 25, 2021
2 parents 76757fe + cc2ea9c commit 8d1fff0
Show file tree
Hide file tree
Showing 5 changed files with 123 additions and 12 deletions.
4 changes: 2 additions & 2 deletions server/commitlog/commitlog.go
Original file line number Diff line number Diff line change
Expand Up @@ -622,8 +622,8 @@ func (l *commitLog) Segments() []*segment {
// the given log end offset are added to the log. If the given offset is no
// longer the log end offset, the channel is closed immediately. Waiter is an
// opaque value that uniquely identifies the entity waiting for data.
func (l *commitLog) NotifyLEO(waiter interface{}, leo int64) <-chan struct{} {
return l.activeSegment().WaitForLEO(waiter, leo)
func (l *commitLog) NotifyLEO(waiter interface{}, expectedLEO int64) <-chan struct{} {
return l.activeSegment().WaitForLEO(waiter, expectedLEO, l.NewestOffset())
}

// SetReadonly marks the log as readonly. When in readonly mode, new messages
Expand Down
18 changes: 14 additions & 4 deletions server/commitlog/segment.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,11 +154,15 @@ func (s *segment) CheckSplit(logRollTime time.Duration) bool {
}

// Seal a segment from being written to. This is called on the former active
// segment after a new segment is rolled. This is a no-op if the segment is
// already sealed.
// segment after a new segment is rolled or when the segment is closed. This is
// a no-op if the segment is already sealed.
func (s *segment) Seal() {
s.Lock()
defer s.Unlock()
s.seal()
}

func (s *segment) seal() {
if s.sealed {
return
}
Expand Down Expand Up @@ -266,10 +270,15 @@ func (s *segment) notifyWaiters() {
}
}

func (s *segment) WaitForLEO(waiter interface{}, leo int64) <-chan struct{} {
func (s *segment) WaitForLEO(waiter interface{}, expectedLEO, actualLEO int64) <-chan struct{} {
s.Lock()
defer s.Unlock()
if s.lastOffset != leo {
// Check expected LEO against last known LEO and against the current
// (active) segment's last offset in case the LEO changed since we last
// checked it. If the current segment's last offset is -1, this means the
// segment is empty and we should wait for data.
if expectedLEO != actualLEO || (expectedLEO != s.lastOffset && s.lastOffset != -1) {
// LEO has since changed so close channel immediately.
ch := make(chan struct{})
close(ch)
return ch
Expand Down Expand Up @@ -324,6 +333,7 @@ func (s *segment) close() error {
return err
}
s.closed = true
s.seal()
return nil
}

Expand Down
76 changes: 76 additions & 0 deletions server/commitlog/segment_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,3 +135,79 @@ func TestSegmentSealIdempotent(t *testing.T) {
require.NoError(t, err)
require.Equal(t, int64(256), stats.Size())
}

// Ensure WaitForLEO returns a channel that is closed immediately when the
// expected LEO differs from the actual LEO. Otherwise returns a channel that
// is closed when the LEO changes or the segment is sealed.
func TestSegmentWaitForLEO(t *testing.T) {
dir := tempDir(t)
defer remove(t, dir)

s := createSegment(t, dir, 0, 100)

// Channel should be closed immediately if the expected LEO differs from
// the actual.
waiter := s.WaitForLEO(struct{}{}, 0, 1)
select {
case <-waiter:
case <-time.After(time.Second):
t.Fatal("Expected channel to be closed")
}

require.NoError(t, s.WriteMessageSet(make([]byte, 5), []*entry{{Offset: 0, Size: 5}}))
require.NoError(t, s.WriteMessageSet(make([]byte, 5), []*entry{{Offset: 1, Size: 5}}))

// Channel should be closed immediately if the expected LEO and actual last
// known LEO are the same but the expected differs from the active
// segment's last offset.
waiter = s.WaitForLEO(struct{}{}, 0, 0)
select {
case <-waiter:
case <-time.After(time.Second):
t.Fatal("Expected channel to be closed")
}

// Channel should not be closed until segment is written to.
waiter = s.WaitForLEO(struct{}{}, 1, 1)
select {
case <-waiter:
t.Fatal("Channel was unexpectedly closed")
default:
}

require.NoError(t, s.WriteMessageSet(make([]byte, 5), []*entry{{Offset: 2, Size: 5}}))

// Channel should now be closed.
select {
case <-waiter:
case <-time.After(time.Second):
t.Fatal("Expected channel to be closed")
}

// Channel should not be closed until segment is sealed.
waiter = s.WaitForLEO(struct{}{}, 2, 2)
select {
case <-waiter:
t.Fatal("Channel was unexpectedly closed")
default:
}

s.Seal()

// Channel should now be closed.
select {
case <-waiter:
case <-time.After(time.Second):
t.Fatal("Expected channel to be closed")
}

require.NoError(t, s.WriteMessageSet(make([]byte, 100), []*entry{{Offset: 3, Size: 100}}))

// Channel should be closed immediately because the segment is full.
waiter = s.WaitForLEO(struct{}{}, 3, 3)
select {
case <-waiter:
case <-time.After(time.Second):
t.Fatal("Expected channel to be closed")
}
}
13 changes: 7 additions & 6 deletions server/metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,8 @@ type leaderReport struct {
// of replicas have reported the leader, a new leader will be selected.
// Otherwise, the expiration timer is reset. An error is returned if selecting
// a new leader fails.
func (l *leaderReport) addWitness(replica string) *status.Status {
func (l *leaderReport) addWitness(ctx context.Context, replica string) *status.Status {
l.mu.Lock()
defer l.mu.Unlock()

l.witnessReplicas[replica] = struct{}{}

Expand All @@ -69,7 +68,8 @@ func (l *leaderReport) addWitness(replica string) *status.Status {
if l.timer != nil {
l.timer.Stop()
}
return l.api.electNewPartitionLeader(l.partition)
l.mu.Unlock()
return l.api.electNewPartitionLeader(ctx, l.partition)
}

if l.timer != nil {
Expand All @@ -82,6 +82,7 @@ func (l *leaderReport) addWitness(replica string) *status.Status {
l.api.mu.Unlock()
})
}
l.mu.Unlock()
return nil
}

Expand Down Expand Up @@ -670,7 +671,7 @@ func (m *metadataAPI) ReportLeader(ctx context.Context, req *proto.ReportLeaderO
}
m.mu.Unlock()

return reported.addWitness(req.Replica)
return reported.addWitness(ctx, req.Replica)
}

// SetStreamReadonly sets a stream's readonly flag if this server is the
Expand Down Expand Up @@ -1101,7 +1102,7 @@ func (m *metadataAPI) getClusterServerIDs() ([]string, error) {
// electNewPartitionLeader selects a new leader for the given partition,
// applies this update to the Raft group, and notifies the replica set. This
// will fail if the current broker is not the metadata leader.
func (m *metadataAPI) electNewPartitionLeader(partition *partition) *status.Status {
func (m *metadataAPI) electNewPartitionLeader(ctx context.Context, partition *partition) *status.Status {
isr := partition.GetISR()
// TODO: add support for "unclean" leader elections.
if len(isr) <= 1 {
Expand Down Expand Up @@ -1135,7 +1136,7 @@ func (m *metadataAPI) electNewPartitionLeader(partition *partition) *status.Stat
}

// Wait on result of replication.
future, err := m.getRaft().applyOperation(context.TODO(), op, m.checkChangeLeaderPreconditions)
future, err := m.getRaft().applyOperation(ctx, op, m.checkChangeLeaderPreconditions)
if err != nil {
return status.Newf(codes.FailedPrecondition, err.Error())
}
Expand Down
24 changes: 24 additions & 0 deletions website/i18n/en.json
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,30 @@
},
"version-v1.4.1/version-v1.4.1-quick-start": {
"title": "Quick Start"
},
"version-v1.5.0/version-v1.5.0-client-implementation": {
"title": "Client Implementation Guidance"
},
"version-v1.5.0/version-v1.5.0-concepts": {
"title": "Concepts"
},
"version-v1.5.0/version-v1.5.0-configuration": {
"title": "Configuration"
},
"version-v1.5.0/version-v1.5.0-embedded-nats": {
"title": "Embedded NATS"
},
"version-v1.5.0/version-v1.5.0-quick-start": {
"title": "Quick Start"
},
"version-v1.5.0/version-v1.5.0-replication-protocol": {
"title": "Replication Protocol"
},
"version-v1.5.0/version-v1.5.0-roadmap": {
"title": "Product Roadmap"
},
"version-v1.5.0/version-v1.5.0-scalability-configuration": {
"title": "Configuring for Scalability"
}
},
"links": {
Expand Down

0 comments on commit 8d1fff0

Please sign in to comment.