diff --git a/ddsketch/store/buffered_paginated.go b/ddsketch/store/buffered_paginated.go index f08bf55..9ea6a7c 100644 --- a/ddsketch/store/buffered_paginated.go +++ b/ddsketch/store/buffered_paginated.go @@ -376,24 +376,26 @@ func (s *BufferedPaginatedStore) minIndexWithCumulCount(predicate func(float64) func (s *BufferedPaginatedStore) MergeWith(other Store) { o, ok := other.(*BufferedPaginatedStore) - if ok && len(o.pages) == 0 { - // Optimized merging if the other store only has buffered data. - oBufferOffset := 0 - for { - bufferCapOverhead := max(cap(s.buffer), s.bufferCompactionTriggerLen) - len(s.buffer) - if bufferCapOverhead >= len(o.buffer)-oBufferOffset { - s.buffer = append(s.buffer, o.buffer[oBufferOffset:]...) - return + if ok && s.pageLenLog2 == o.pageLenLog2 { + // Merge pages. + for oPageOffset, oPage := range o.pages { + oPageIndex := o.minPageIndex + oPageOffset + page := s.page(oPageIndex, true) + for i, oCount := range oPage { + page[i] += oCount } - s.buffer = append(s.buffer, o.buffer[oBufferOffset:oBufferOffset+bufferCapOverhead]...) - oBufferOffset += bufferCapOverhead - s.compact() } - } - // Fallback merging. - for bin := range other.Bins() { - s.AddBin(bin) + // Merge buffers. + for _, index := range o.buffer { + s.Add(index) + } + } else { + // Fallback merging. + other.ForEach(func(index int, count float64) (stop bool) { + s.AddWithCount(index, count) + return false + }) } } @@ -541,9 +543,10 @@ func (s *BufferedPaginatedStore) ToProto() *sketchpb.Store { } // FIXME: add heuristic to use contiguousBinCounts when cheaper. binCounts := make(map[int32]float64) - for bin := range s.Bins() { - binCounts[int32(bin.index)] = bin.count - } + s.ForEach(func(index int, count float64) (stop bool) { + binCounts[int32(index)] = count + return false + }) return &sketchpb.Store{ BinCounts: binCounts, } diff --git a/ddsketch/store/collapsing_highest_dense_store.go b/ddsketch/store/collapsing_highest_dense_store.go index 4cc7832..2a431a1 100644 --- a/ddsketch/store/collapsing_highest_dense_store.go +++ b/ddsketch/store/collapsing_highest_dense_store.go @@ -136,9 +136,10 @@ func (s *CollapsingHighestDenseStore) MergeWith(other Store) { } o, ok := other.(*CollapsingHighestDenseStore) if !ok { - for bin := range other.Bins() { - s.AddBin(bin) - } + other.ForEach(func(index int, count float64) (stop bool) { + s.AddWithCount(index, count) + return false + }) return } if o.minIndex < s.minIndex || o.maxIndex > s.maxIndex { diff --git a/ddsketch/store/collapsing_lowest_dense_store.go b/ddsketch/store/collapsing_lowest_dense_store.go index 7549083..80ae2a5 100644 --- a/ddsketch/store/collapsing_lowest_dense_store.go +++ b/ddsketch/store/collapsing_lowest_dense_store.go @@ -141,9 +141,10 @@ func (s *CollapsingLowestDenseStore) MergeWith(other Store) { } o, ok := other.(*CollapsingLowestDenseStore) if !ok { - for bin := range other.Bins() { - s.AddBin(bin) - } + other.ForEach(func(index int, count float64) (stop bool) { + s.AddWithCount(index, count) + return false + }) return } if o.minIndex < s.minIndex || o.maxIndex > s.maxIndex { diff --git a/ddsketch/store/dense_store.go b/ddsketch/store/dense_store.go index 5d77e48..2c4a3d4 100644 --- a/ddsketch/store/dense_store.go +++ b/ddsketch/store/dense_store.go @@ -170,9 +170,10 @@ func (s *DenseStore) MergeWith(other Store) { } o, ok := other.(*DenseStore) if !ok { - for bin := range other.Bins() { - s.AddBin(bin) - } + other.ForEach(func(index int, count float64) (stop bool) { + s.AddWithCount(index, count) + return false + }) return } if o.minIndex < s.minIndex || o.maxIndex > s.maxIndex { diff --git a/ddsketch/store/sparse.go b/ddsketch/store/sparse.go index 310e4fc..9a07836 100644 --- a/ddsketch/store/sparse.go +++ b/ddsketch/store/sparse.go @@ -136,9 +136,10 @@ func (s *SparseStore) KeyAtRank(rank float64) int { } func (s *SparseStore) MergeWith(store Store) { - for bin := range store.Bins() { - s.AddBin(bin) - } + store.ForEach(func(index int, count float64) (stop bool) { + s.AddWithCount(index, count) + return false + }) } func (s *SparseStore) ToProto() *sketchpb.Store { diff --git a/ddsketch/store/store_test.go b/ddsketch/store/store_test.go index 9d4b3e1..7278100 100644 --- a/ddsketch/store/store_test.go +++ b/ddsketch/store/store_test.go @@ -1058,6 +1058,33 @@ func BenchmarkNewAndAddWithCountNorm(b *testing.B) { } } +func BenchmarkMergeWith(b *testing.B) { + numDistinctSketchesLog2 := 3 + for numIndexesLog10 := 0; numIndexesLog10 <= 6; numIndexesLog10++ { + numIndexes := int(math.Pow10(numIndexesLog10)) // per store + b.Run(fmt.Sprintf("1e%d", numIndexesLog10), func(b *testing.B) { + for _, testCase := range testCases { + stores := make([]Store, 1<