Skip to content

Commit

Permalink
Optimize buffered paginated store merging
Browse files Browse the repository at this point in the history
  • Loading branch information
CharlesMasson committed Jan 5, 2022
1 parent 53f56a1 commit 1693ec9
Showing 1 changed file with 18 additions and 17 deletions.
35 changes: 18 additions & 17 deletions ddsketch/store/buffered_paginated.go
Original file line number Diff line number Diff line change
Expand Up @@ -376,26 +376,27 @@ func (s *BufferedPaginatedStore) minIndexWithCumulCount(predicate func(float64)

func (s *BufferedPaginatedStore) MergeWith(other Store) {
o, ok := other.(*BufferedPaginatedStore)
if ok && len(o.pages) == 0 {
// Optimized merging if the other store only has buffered data.
oBufferOffset := 0
for {
bufferCapOverhead := max(cap(s.buffer), s.bufferCompactionTriggerLen) - len(s.buffer)
if bufferCapOverhead >= len(o.buffer)-oBufferOffset {
s.buffer = append(s.buffer, o.buffer[oBufferOffset:]...)
return
if ok && s.pageLenLog2 == o.pageLenLog2 {
// Merge pages.
for oPageOffset, oPage := range o.pages {
oPageIndex := o.minPageIndex + oPageOffset
page := s.page(oPageIndex, true)
for i, oCount := range oPage {
page[i] += oCount
}
s.buffer = append(s.buffer, o.buffer[oBufferOffset:oBufferOffset+bufferCapOverhead]...)
oBufferOffset += bufferCapOverhead
s.compact()
}
}

// Fallback merging.
other.ForEach(func(index int, count float64) (stop bool) {
s.AddWithCount(index, count)
return false
})
// Merge buffers.
for _, index := range o.buffer {
s.Add(index)
}
} else {
// Fallback merging.
other.ForEach(func(index int, count float64) (stop bool) {
s.AddWithCount(index, count)
return false
})
}
}

func (s *BufferedPaginatedStore) MergeWithProto(pb *sketchpb.Store) {
Expand Down

0 comments on commit 1693ec9

Please sign in to comment.