Skip to content

Commit

Permalink
Merge pull request #60 from DataDog/cmasson/buffered_paginated_merging
Browse files Browse the repository at this point in the history
Optimize buffered paginated store merging
  • Loading branch information
CharlesMasson authored Jan 6, 2022
2 parents d93ea08 + 1693ec9 commit 2507e6f
Show file tree
Hide file tree
Showing 6 changed files with 64 additions and 30 deletions.
39 changes: 21 additions & 18 deletions ddsketch/store/buffered_paginated.go
Original file line number Diff line number Diff line change
Expand Up @@ -376,24 +376,26 @@ func (s *BufferedPaginatedStore) minIndexWithCumulCount(predicate func(float64)

func (s *BufferedPaginatedStore) MergeWith(other Store) {
o, ok := other.(*BufferedPaginatedStore)
if ok && len(o.pages) == 0 {
// Optimized merging if the other store only has buffered data.
oBufferOffset := 0
for {
bufferCapOverhead := max(cap(s.buffer), s.bufferCompactionTriggerLen) - len(s.buffer)
if bufferCapOverhead >= len(o.buffer)-oBufferOffset {
s.buffer = append(s.buffer, o.buffer[oBufferOffset:]...)
return
if ok && s.pageLenLog2 == o.pageLenLog2 {
// Merge pages.
for oPageOffset, oPage := range o.pages {
oPageIndex := o.minPageIndex + oPageOffset
page := s.page(oPageIndex, true)
for i, oCount := range oPage {
page[i] += oCount
}
s.buffer = append(s.buffer, o.buffer[oBufferOffset:oBufferOffset+bufferCapOverhead]...)
oBufferOffset += bufferCapOverhead
s.compact()
}
}

// Fallback merging.
for bin := range other.Bins() {
s.AddBin(bin)
// Merge buffers.
for _, index := range o.buffer {
s.Add(index)
}
} else {
// Fallback merging.
other.ForEach(func(index int, count float64) (stop bool) {
s.AddWithCount(index, count)
return false
})
}
}

Expand Down Expand Up @@ -541,9 +543,10 @@ func (s *BufferedPaginatedStore) ToProto() *sketchpb.Store {
}
// FIXME: add heuristic to use contiguousBinCounts when cheaper.
binCounts := make(map[int32]float64)
for bin := range s.Bins() {
binCounts[int32(bin.index)] = bin.count
}
s.ForEach(func(index int, count float64) (stop bool) {
binCounts[int32(index)] = count
return false
})
return &sketchpb.Store{
BinCounts: binCounts,
}
Expand Down
7 changes: 4 additions & 3 deletions ddsketch/store/collapsing_highest_dense_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,9 +136,10 @@ func (s *CollapsingHighestDenseStore) MergeWith(other Store) {
}
o, ok := other.(*CollapsingHighestDenseStore)
if !ok {
for bin := range other.Bins() {
s.AddBin(bin)
}
other.ForEach(func(index int, count float64) (stop bool) {
s.AddWithCount(index, count)
return false
})
return
}
if o.minIndex < s.minIndex || o.maxIndex > s.maxIndex {
Expand Down
7 changes: 4 additions & 3 deletions ddsketch/store/collapsing_lowest_dense_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,9 +141,10 @@ func (s *CollapsingLowestDenseStore) MergeWith(other Store) {
}
o, ok := other.(*CollapsingLowestDenseStore)
if !ok {
for bin := range other.Bins() {
s.AddBin(bin)
}
other.ForEach(func(index int, count float64) (stop bool) {
s.AddWithCount(index, count)
return false
})
return
}
if o.minIndex < s.minIndex || o.maxIndex > s.maxIndex {
Expand Down
7 changes: 4 additions & 3 deletions ddsketch/store/dense_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,9 +170,10 @@ func (s *DenseStore) MergeWith(other Store) {
}
o, ok := other.(*DenseStore)
if !ok {
for bin := range other.Bins() {
s.AddBin(bin)
}
other.ForEach(func(index int, count float64) (stop bool) {
s.AddWithCount(index, count)
return false
})
return
}
if o.minIndex < s.minIndex || o.maxIndex > s.maxIndex {
Expand Down
7 changes: 4 additions & 3 deletions ddsketch/store/sparse.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,9 +136,10 @@ func (s *SparseStore) KeyAtRank(rank float64) int {
}

func (s *SparseStore) MergeWith(store Store) {
for bin := range store.Bins() {
s.AddBin(bin)
}
store.ForEach(func(index int, count float64) (stop bool) {
s.AddWithCount(index, count)
return false
})
}

func (s *SparseStore) ToProto() *sketchpb.Store {
Expand Down
27 changes: 27 additions & 0 deletions ddsketch/store/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1058,6 +1058,33 @@ func BenchmarkNewAndAddWithCountNorm(b *testing.B) {
}
}

func BenchmarkMergeWith(b *testing.B) {
numDistinctSketchesLog2 := 3
for numIndexesLog10 := 0; numIndexesLog10 <= 6; numIndexesLog10++ {
numIndexes := int(math.Pow10(numIndexesLog10)) // per store
b.Run(fmt.Sprintf("1e%d", numIndexesLog10), func(b *testing.B) {
for _, testCase := range testCases {
stores := make([]Store, 1<<numDistinctSketchesLog2)
for i := range stores {
stores[i] = testCase.newStore()
for j := 0; j < numIndexes; j++ {
stores[i].Add(int(rand.NormFloat64() * 200))
}
}
store := testCase.newStore()
b.Run(testCase.name, func(b *testing.B) {
// Note that this is not ideal given that the computational cost
// of merging may vary with the number of stores already merged.
for i := 0; i < b.N; i++ {
store.MergeWith(stores[i&((1<<3)-1)])
}
})
sink = store
}
})
}
}

func TestBenchmarkSize(t *testing.T) {
for numIndexesLog10 := 0; numIndexesLog10 <= 7; numIndexesLog10++ {
numIndexes := int(math.Pow10(numIndexesLog10))
Expand Down

0 comments on commit 2507e6f

Please sign in to comment.