Skip to content

Commit

Permalink
Merge pull request #61 from DataDog/cmasson/inner
Browse files Browse the repository at this point in the history
Expose inner DDSketch when tracking exact summary statistics
  • Loading branch information
CharlesMasson authored Jan 5, 2022
2 parents d19516c + 18b8c7d commit d93ea08
Show file tree
Hide file tree
Showing 6 changed files with 114 additions and 92 deletions.
107 changes: 51 additions & 56 deletions ddsketch/ddsketch.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,12 @@ import (
)

var (
errEmptySketch error = errors.New("no such element exists")
errUnknownFlag error = errors.New("unknown encoding flag")
ErrUntrackableNaN = errors.New("input value is NaN and cannot be tracked by the sketch")
ErrUntrackableTooLow = errors.New("input value is too low and cannot be tracked by the sketch")
ErrUntrackableTooHigh = errors.New("input value is too high and cannot be tracked by the sketch")
ErrNegativeCount = errors.New("count cannot be negative")
errEmptySketch = errors.New("no such element exists")
errUnknownFlag = errors.New("unknown encoding flag")
)

// Unexported to prevent usage and avoid the cost of dynamic dispatch
Expand Down Expand Up @@ -49,11 +53,9 @@ var _ quantileSketch = (*DDSketchWithExactSummaryStatistics)(nil)

type DDSketch struct {
mapping.IndexMapping
positiveValueStore store.Store
negativeValueStore store.Store
zeroCount float64
minIndexableAbsoluteValue float64
maxIndexableValue float64
positiveValueStore store.Store
negativeValueStore store.Store
zeroCount float64
}

func NewDDSketchFromStoreProvider(indexMapping mapping.IndexMapping, storeProvider store.Provider) *DDSketch {
Expand All @@ -62,11 +64,9 @@ func NewDDSketchFromStoreProvider(indexMapping mapping.IndexMapping, storeProvid

func NewDDSketch(indexMapping mapping.IndexMapping, positiveValueStore store.Store, negativeValueStore store.Store) *DDSketch {
return &DDSketch{
IndexMapping: indexMapping,
positiveValueStore: positiveValueStore,
negativeValueStore: negativeValueStore,
minIndexableAbsoluteValue: indexMapping.MinIndexableValue(),
maxIndexableValue: indexMapping.MaxIndexableValue(),
IndexMapping: indexMapping,
positiveValueStore: positiveValueStore,
negativeValueStore: negativeValueStore,
}
}

Expand Down Expand Up @@ -119,17 +119,22 @@ func (s *DDSketch) Add(value float64) error {

// Adds a value to the sketch with a float64 count.
func (s *DDSketch) AddWithCount(value, count float64) error {
if value < -s.maxIndexableValue || value > s.maxIndexableValue {
return errors.New("The input value is outside the range that is tracked by the sketch.")
}
if count < 0 {
return errors.New("The count cannot be negative.")
return ErrNegativeCount
}

if value > s.minIndexableAbsoluteValue {
if value > s.MinIndexableValue() {
if value > s.MaxIndexableValue() {
return ErrUntrackableTooHigh
}
s.positiveValueStore.AddWithCount(s.Index(value), count)
} else if value < -s.minIndexableAbsoluteValue {
} else if value < -s.MinIndexableValue() {
if value < -s.MaxIndexableValue() {
return ErrUntrackableTooLow
}
s.negativeValueStore.AddWithCount(s.Index(-value), count)
} else if math.IsNaN(value) {
return ErrUntrackableNaN
} else {
s.zeroCount += count
}
Expand All @@ -139,12 +144,10 @@ func (s *DDSketch) AddWithCount(value, count float64) error {
// Return a (deep) copy of this sketch.
func (s *DDSketch) Copy() *DDSketch {
return &DDSketch{
IndexMapping: s.IndexMapping,
positiveValueStore: s.positiveValueStore.Copy(),
negativeValueStore: s.negativeValueStore.Copy(),
zeroCount: s.zeroCount,
minIndexableAbsoluteValue: s.minIndexableAbsoluteValue,
maxIndexableValue: s.maxIndexableValue,
IndexMapping: s.IndexMapping,
positiveValueStore: s.positiveValueStore.Copy(),
negativeValueStore: s.negativeValueStore.Copy(),
zeroCount: s.zeroCount,
}
}

Expand Down Expand Up @@ -303,12 +306,10 @@ func FromProtoWithStoreProvider(pb *sketchpb.DDSketch, storeProvider store.Provi
return nil, err
}
return &DDSketch{
IndexMapping: m,
positiveValueStore: positiveValueStore,
negativeValueStore: negativeValueStore,
zeroCount: pb.ZeroCount,
minIndexableAbsoluteValue: m.MinIndexableValue(),
maxIndexableValue: m.MaxIndexableValue(),
IndexMapping: m,
positiveValueStore: positiveValueStore,
negativeValueStore: negativeValueStore,
zeroCount: pb.ZeroCount,
}, nil
}

Expand Down Expand Up @@ -420,8 +421,6 @@ func (s *DDSketch) decodeAndMergeWith(bb []byte, fallbackDecode func(b *[]byte,
if s.IndexMapping == nil {
return errors.New("missing index mapping")
}
s.minIndexableAbsoluteValue = s.IndexMapping.MinIndexableValue()
s.maxIndexableValue = s.IndexMapping.MaxIndexableValue()
return nil
}

Expand Down Expand Up @@ -484,7 +483,7 @@ func (s *DDSketch) Reweight(w float64) error {
// statistics. Because of the need to track them exactly, adding and merging
// operations are slightly more exepensive than those of DDSketch.
type DDSketchWithExactSummaryStatistics struct {
sketch *DDSketch
*DDSketch
summaryStatistics *stat.SummaryStatistics
}

Expand All @@ -494,14 +493,14 @@ func NewDefaultDDSketchWithExactSummaryStatistics(relativeAccuracy float64) (*DD
return nil, err
}
return &DDSketchWithExactSummaryStatistics{
sketch: sketch,
DDSketch: sketch,
summaryStatistics: stat.NewSummaryStatistics(),
}, nil
}

func NewDDSketchWithExactSummaryStatistics(mapping mapping.IndexMapping, storeProvider store.Provider) *DDSketchWithExactSummaryStatistics {
return &DDSketchWithExactSummaryStatistics{
sketch: NewDDSketchFromStoreProvider(mapping, storeProvider),
DDSketch: NewDDSketchFromStoreProvider(mapping, storeProvider),
summaryStatistics: stat.NewSummaryStatistics(),
}
}
Expand All @@ -512,15 +511,11 @@ func NewDDSketchWithExactSummaryStatisticsFromData(sketch *DDSketch, summaryStat
return nil, errors.New("sketch and summary statistics do not match")
}
return &DDSketchWithExactSummaryStatistics{
sketch: sketch,
DDSketch: sketch,
summaryStatistics: summaryStatistics,
}, nil
}

func (s *DDSketchWithExactSummaryStatistics) RelativeAccuracy() float64 {
return s.sketch.RelativeAccuracy()
}

func (s *DDSketchWithExactSummaryStatistics) IsEmpty() bool {
return s.summaryStatistics.Count() == 0
}
Expand All @@ -534,21 +529,21 @@ func (s *DDSketchWithExactSummaryStatistics) GetSum() float64 {
}

func (s *DDSketchWithExactSummaryStatistics) GetMinValue() (float64, error) {
if s.sketch.IsEmpty() {
if s.DDSketch.IsEmpty() {
return math.NaN(), errEmptySketch
}
return s.summaryStatistics.Min(), nil
}

func (s *DDSketchWithExactSummaryStatistics) GetMaxValue() (float64, error) {
if s.sketch.IsEmpty() {
if s.DDSketch.IsEmpty() {
return math.NaN(), errEmptySketch
}
return s.summaryStatistics.Max(), nil
}

func (s *DDSketchWithExactSummaryStatistics) GetValueAtQuantile(quantile float64) (float64, error) {
value, err := s.sketch.GetValueAtQuantile(quantile)
value, err := s.DDSketch.GetValueAtQuantile(quantile)
min := s.summaryStatistics.Min()
if value < min {
return min, err
Expand All @@ -561,7 +556,7 @@ func (s *DDSketchWithExactSummaryStatistics) GetValueAtQuantile(quantile float64
}

func (s *DDSketchWithExactSummaryStatistics) GetValuesAtQuantiles(quantiles []float64) ([]float64, error) {
values, err := s.sketch.GetValuesAtQuantiles(quantiles)
values, err := s.DDSketch.GetValuesAtQuantiles(quantiles)
min := s.summaryStatistics.Min()
max := s.summaryStatistics.Max()
for i := range values {
Expand All @@ -575,16 +570,16 @@ func (s *DDSketchWithExactSummaryStatistics) GetValuesAtQuantiles(quantiles []fl
}

func (s *DDSketchWithExactSummaryStatistics) ForEach(f func(value, count float64) (stop bool)) {
s.sketch.ForEach(f)
s.DDSketch.ForEach(f)
}

func (s *DDSketchWithExactSummaryStatistics) Clear() {
s.sketch.Clear()
s.DDSketch.Clear()
s.summaryStatistics.Clear()
}

func (s *DDSketchWithExactSummaryStatistics) Add(value float64) error {
err := s.sketch.Add(value)
err := s.DDSketch.Add(value)
if err != nil {
return err
}
Expand All @@ -596,7 +591,7 @@ func (s *DDSketchWithExactSummaryStatistics) AddWithCount(value, count float64)
if count == 0 {
return nil
}
err := s.sketch.AddWithCount(value, count)
err := s.DDSketch.AddWithCount(value, count)
if err != nil {
return err
}
Expand All @@ -605,7 +600,7 @@ func (s *DDSketchWithExactSummaryStatistics) AddWithCount(value, count float64)
}

func (s *DDSketchWithExactSummaryStatistics) MergeWith(o *DDSketchWithExactSummaryStatistics) error {
err := s.sketch.MergeWith(o.sketch)
err := s.DDSketch.MergeWith(o.DDSketch)
if err != nil {
return err
}
Expand All @@ -615,13 +610,13 @@ func (s *DDSketchWithExactSummaryStatistics) MergeWith(o *DDSketchWithExactSumma

func (s *DDSketchWithExactSummaryStatistics) Copy() *DDSketchWithExactSummaryStatistics {
return &DDSketchWithExactSummaryStatistics{
sketch: s.sketch.Copy(),
DDSketch: s.DDSketch.Copy(),
summaryStatistics: s.summaryStatistics.Copy(),
}
}

func (s *DDSketchWithExactSummaryStatistics) Reweight(factor float64) error {
err := s.sketch.Reweight(factor)
err := s.DDSketch.Reweight(factor)
if err != nil {
return err
}
Expand All @@ -633,7 +628,7 @@ func (s *DDSketchWithExactSummaryStatistics) ChangeMapping(newMapping mapping.In
summaryStatisticsCopy := s.summaryStatistics.Copy()
summaryStatisticsCopy.Rescale(scaleFactor)
return &DDSketchWithExactSummaryStatistics{
sketch: s.sketch.ChangeMapping(newMapping, storeProvider(), storeProvider(), scaleFactor),
DDSketch: s.DDSketch.ChangeMapping(newMapping, storeProvider(), storeProvider(), scaleFactor),
summaryStatistics: summaryStatisticsCopy,
}
}
Expand All @@ -655,7 +650,7 @@ func (s *DDSketchWithExactSummaryStatistics) Encode(b *[]byte, omitIndexMapping
enc.EncodeFlag(b, enc.FlagMax)
enc.EncodeFloat64LE(b, s.summaryStatistics.Max())
}
s.sketch.Encode(b, omitIndexMapping)
s.DDSketch.Encode(b, omitIndexMapping)
}

// DecodeDDSketchWithExactSummaryStatistics deserializes a sketch.
Expand All @@ -675,7 +670,7 @@ func (s *DDSketchWithExactSummaryStatistics) Encode(b *[]byte, omitIndexMapping
// it is empty), because it does not track exact summary statistics
func DecodeDDSketchWithExactSummaryStatistics(b []byte, storeProvider store.Provider, indexMapping mapping.IndexMapping) (*DDSketchWithExactSummaryStatistics, error) {
s := &DDSketchWithExactSummaryStatistics{
sketch: &DDSketch{
DDSketch: &DDSketch{
IndexMapping: indexMapping,
positiveValueStore: storeProvider(),
negativeValueStore: storeProvider(),
Expand All @@ -688,7 +683,7 @@ func DecodeDDSketchWithExactSummaryStatistics(b []byte, storeProvider store.Prov
}

func (s *DDSketchWithExactSummaryStatistics) DecodeAndMergeWith(bb []byte) error {
err := s.sketch.decodeAndMergeWith(bb, func(b *[]byte, flag enc.Flag) error {
err := s.DDSketch.decodeAndMergeWith(bb, func(b *[]byte, flag enc.Flag) error {
switch flag {
case enc.FlagCount:
count, err := enc.DecodeVarfloat64(b)
Expand Down Expand Up @@ -720,7 +715,7 @@ func (s *DDSketchWithExactSummaryStatistics) DecodeAndMergeWith(bb []byte) error
}
// It is assumed that if the count is encoded, other exact summary
// statistics are encoded as well, which is the case if Encode is used.
if s.summaryStatistics.Count() == 0 && !s.sketch.IsEmpty() {
if s.summaryStatistics.Count() == 0 && !s.DDSketch.IsEmpty() {
return errors.New("missing exact summary statistics")
}
return nil
Expand Down
8 changes: 8 additions & 0 deletions ddsketch/ddsketch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -460,6 +460,14 @@ func TestForEach(t *testing.T) {
}
}

func TestErrors(t *testing.T) {
sketch, _ := LogUnboundedDenseDDSketch(0.01)
assert.Equal(t, ErrUntrackableTooLow, sketch.Add(math.Inf(-1)))
assert.Equal(t, ErrUntrackableTooHigh, sketch.Add(math.Inf(1)))
assert.Equal(t, ErrUntrackableNaN, sketch.Add(math.NaN()))
assert.Equal(t, ErrNegativeCount, sketch.AddWithCount(1, -1))
}

func TestDecodingErrors(t *testing.T) {
mapping1, _ := mapping.NewCubicallyInterpolatedMappingWithGamma(1.02, 0)
mapping2, _ := mapping.NewCubicallyInterpolatedMappingWithGamma(1.04, 0)
Expand Down
30 changes: 18 additions & 12 deletions ddsketch/mapping/cubically_interpolated_mapping.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,11 @@ const (
// documentation of this method can be found in: <a
// href="https://github.com/DataDog/sketches-java/">sketches-java</a>
type CubicallyInterpolatedMapping struct {
gamma float64 // base
indexOffset float64
multiplier float64 // precomputed for performance
gamma float64 // base
indexOffset float64
multiplier float64 // precomputed for performance
minIndexableValue float64
maxIndexableValue float64
}

func NewCubicallyInterpolatedMapping(relativeAccuracy float64) (*CubicallyInterpolatedMapping, error) {
Expand All @@ -46,10 +48,20 @@ func NewCubicallyInterpolatedMappingWithGamma(gamma, indexOffset float64) (*Cubi
if gamma <= 1 {
return nil, errors.New("Gamma must be greater than 1.")
}
multiplier := 1 / math.Log2(gamma)
adjustedGamma := math.Pow(gamma, 7/(10*math.Ln2))
m := CubicallyInterpolatedMapping{
gamma: gamma,
indexOffset: indexOffset,
multiplier: 1 / math.Log2(gamma),
multiplier: multiplier,
minIndexableValue: math.Max(
math.Exp2((math.MinInt32-indexOffset)/multiplier+1), // so that index >= MinInt32
minNormalFloat64*adjustedGamma,
),
maxIndexableValue: math.Min(
math.Exp2((math.MaxInt32-indexOffset)/multiplier-1), // so that index <= MaxInt32
math.Exp(expOverflow)/(2*adjustedGamma)*(adjustedGamma+1), // so that math.Exp does not overflow
),
}
return &m, nil
}
Expand Down Expand Up @@ -100,17 +112,11 @@ func (m *CubicallyInterpolatedMapping) approximateInverseLog(x float64) float64
}

func (m *CubicallyInterpolatedMapping) MinIndexableValue() float64 {
return math.Max(
math.Exp2((math.MinInt32-m.indexOffset)/m.multiplier+1), // so that index >= MinInt32
minNormalFloat64*(1+m.RelativeAccuracy())/(1-m.RelativeAccuracy()),
)
return m.minIndexableValue
}

func (m *CubicallyInterpolatedMapping) MaxIndexableValue() float64 {
return math.Min(
math.Exp2((math.MaxInt32-m.indexOffset)/m.multiplier-1), // so that index <= MaxInt32
math.Exp(expOverflow)/(1+m.RelativeAccuracy()), // so that math.Exp does not overflow
)
return m.maxIndexableValue
}

func (m *CubicallyInterpolatedMapping) RelativeAccuracy() float64 {
Expand Down
2 changes: 2 additions & 0 deletions ddsketch/mapping/index_mapping.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,9 @@ type IndexMapping interface {
Value(index int) float64
LowerBound(index int) float64
RelativeAccuracy() float64
// MinIndexableValue returns the minimum positive value that can be mapped to an index.
MinIndexableValue() float64
// MaxIndexableValue returns the maximum positive value that can be mapped to an index.
MaxIndexableValue() float64
ToProto() *sketchpb.IndexMapping
// Encode encodes a mapping and appends its content to the provided []byte.
Expand Down
Loading

0 comments on commit d93ea08

Please sign in to comment.