Skip to content

Commit

Permalink
Merge pull request #29 from RedisBloom/weights.longlong
Browse files Browse the repository at this point in the history
T-Digest weights are now long long
  • Loading branch information
filipecosta90 committed Oct 12, 2022
2 parents be956a8 + 2329577 commit 9dcd73d
Show file tree
Hide file tree
Showing 4 changed files with 173 additions and 204 deletions.
4 changes: 2 additions & 2 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@ if(ENABLE_SANITIZERS)
message(STATUS "Forcing build type to Debug to run coverage.")
set(CMAKE_BUILD_TYPE "Debug" CACHE
STRING "Choose the type of build." FORCE)
set (CMAKE_C_FLAGS_DEBUG "${CMAKE_CXX_FLAGS_DEBUG} -fno-omit-frame-pointer -fsanitize=address")
set (CMAKE_CXX_FLAGS_DEBUG "${CMAKE_CXX_FLAGS_DEBUG} -fno-omit-frame-pointer -fsanitize=address")
set (CMAKE_C_FLAGS_DEBUG "${CMAKE_CXX_FLAGS_DEBUG} -Wshadow -Wpointer-arith -Wcast-qual -Wunused -Wstrict-prototypes -Wmissing-prototypes -Wwrite-strings -Werror -fno-omit-frame-pointer -fsanitize=address")
set (CMAKE_CXX_FLAGS_DEBUG "${CMAKE_CXX_FLAGS_DEBUG} -Wshadow -Wpointer-arith -Wcast-qual -Wunused -Wstrict-prototypes -Wmissing-prototypes -Wwrite-strings -Werror -fno-omit-frame-pointer -fsanitize=address")
set (CMAKE_LINKER_FLAGS_DEBUG "${CMAKE_LINKER_FLAGS_DEBUG} -fno-omit-frame-pointer -fsanitize=address")
ENDIF()

Expand Down
139 changes: 77 additions & 62 deletions src/tdigest.c
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,14 @@ static inline double weighted_average_sorted(double x1, double w1, double x2, do
return __td_max(x1, __td_min(x, x2));
}

static inline bool _tdigest_long_long_add_safe(long long a, long long b) {
if (b < 0) {
return (a >= __LONG_LONG_MAX__ - b);
} else {
return (a <= __LONG_LONG_MAX__ - b);
}
}

static inline double weighted_average(double x1, double w1, double x2, double w2) {
if (x1 <= x2) {
return weighted_average_sorted(x1, w1, x2, w2);
Expand All @@ -34,11 +42,17 @@ static void inline swap(double *arr, int i, int j) {
arr[j] = temp;
}

static unsigned int partition(double *means, double *weights, unsigned int start, unsigned int end,
unsigned int pivot_idx) {
static void inline swap_l(long long *arr, int i, int j) {
const long long temp = arr[i];
arr[i] = arr[j];
arr[j] = temp;
}

static unsigned int partition(double *means, long long *weights, unsigned int start,
unsigned int end, unsigned int pivot_idx) {
const double pivotMean = means[pivot_idx];
swap(means, pivot_idx, end);
swap(weights, pivot_idx, end);
swap_l(weights, pivot_idx, end);

int i = start - 1;

Expand All @@ -48,11 +62,11 @@ static unsigned int partition(double *means, double *weights, unsigned int start
// increment index of smaller element
i++;
swap(means, i, j);
swap(weights, i, j);
swap_l(weights, i, j);
}
}
swap(means, i + 1, end);
swap(weights, i + 1, end);
swap_l(weights, i + 1, end);
return i + 1;
}

Expand All @@ -64,13 +78,13 @@ static unsigned int partition(double *means, double *weights, unsigned int start
* @param start The beginning of the values to sort
* @param end The value after the last value to sort
*/
void td_qsort(double *means, double *weights, unsigned int start, unsigned int end) {
static void td_qsort(double *means, long long *weights, unsigned int start, unsigned int end) {
if (start < end) {
// two elements can be directly compared
if ((end - start) == 1) {
if (means[start] > means[end]) {
swap(means, start, end);
swap(weights, start, end);
swap_l(weights, start, end);
}
return;
}
Expand Down Expand Up @@ -160,7 +174,7 @@ int td_init(double compression, td_histogram_t **result) {
td_free(histogram);
return 1;
}
histogram->nodes_weight = (double *)__td_calloc(capacity, sizeof(double));
histogram->nodes_weight = (long long *)__td_calloc(capacity, sizeof(long long));
if (!histogram->nodes_weight) {
td_free(histogram);
return 1;
Expand All @@ -187,19 +201,22 @@ void td_free(td_histogram_t *histogram) {
}

int td_merge(td_histogram_t *into, td_histogram_t *from) {
td_compress(into);
td_compress(from);
for (int i = 0; i < from->merged_nodes; i++) {
if (td_compress(into) != 0)
return EDOM;
if (td_compress(from) != 0)
return EDOM;
const int pos = from->merged_nodes + from->unmerged_nodes;
for (int i = 0; i < pos; i++) {
const double mean = from->nodes_mean[i];
const double count = from->nodes_weight[i];
if (td_add(into, mean, count) != 0) {
const long long weight = from->nodes_weight[i];
if (td_add(into, mean, weight) != 0) {
return EDOM;
}
}
return 0;
}

double td_size(td_histogram_t *h) { return h->merged_weight + h->unmerged_weight; }
long long td_size(td_histogram_t *h) { return h->merged_weight + h->unmerged_weight; }

double td_cdf(td_histogram_t *h, double val) {
td_compress(h);
Expand Down Expand Up @@ -229,18 +246,19 @@ double td_cdf(td_histogram_t *h, double val) {
const int n = h->merged_nodes;
// check for the left tail
const double left_centroid_mean = h->nodes_mean[0];
const double left_centroid_weight = h->nodes_weight[0];
const double left_centroid_weight = (double)h->nodes_weight[0];
const double merged_weight_d = (double)h->merged_weight;
if (val < left_centroid_mean) {
// note that this is different than h->nodes_mean[0] > min
// ... this guarantees we divide by non-zero number and interpolation works
const double width = left_centroid_mean - h->min;
if (width > 0) {
// must be a sample exactly at min
if (val == h->min) {
return 0.5 / h->merged_weight;
return 0.5 / merged_weight_d;
} else {
return (1 + (val - h->min) / width * (left_centroid_weight / 2 - 1)) /
h->merged_weight;
merged_weight_d;
}
} else {
// this should be redundant with the check val < h->min
Expand All @@ -249,16 +267,16 @@ double td_cdf(td_histogram_t *h, double val) {
}
// and the right tail
const double right_centroid_mean = h->nodes_mean[n - 1];
const double right_centroid_weight = h->nodes_weight[n - 1];
const double right_centroid_weight = (double)h->nodes_weight[n - 1];
if (val > right_centroid_mean) {
const double width = h->max - right_centroid_mean;
if (width > 0) {
if (val == h->max) {
return 1 - 0.5 / h->merged_weight;
return 1 - 0.5 / merged_weight_d;
} else {
// there has to be a single sample exactly at max
const double dq = (1 + (h->max - val) / width * (right_centroid_weight / 2 - 1)) /
h->merged_weight;
merged_weight_d;
return 1 - dq;
}
} else {
Expand All @@ -276,13 +294,13 @@ double td_cdf(td_histogram_t *h, double val) {
// dw will accumulate the weight of all of the centroids at x
double dw = 0;
while (it < n && h->nodes_mean[it] == val) {
dw += h->nodes_weight[it];
dw += (double)h->nodes_weight[it];
it++;
}
return (weightSoFar + dw / 2) / h->merged_weight;
return (weightSoFar + dw / 2) / (double)h->merged_weight;
} else if (h->nodes_mean[it] <= val && val < h->nodes_mean[it + 1]) {
const double node_weight = h->nodes_weight[it];
const double node_weight_next = h->nodes_weight[it + 1];
const double node_weight = (double)h->nodes_weight[it];
const double node_weight_next = (double)h->nodes_weight[it + 1];
const double node_mean = h->nodes_mean[it];
const double node_mean_next = h->nodes_mean[it + 1];
// landed between centroids ... check for floating point madness
Expand All @@ -297,7 +315,7 @@ double td_cdf(td_histogram_t *h, double val) {
if (node_weight_next == 1) {
// two singletons means no interpolation
// left singleton is in, right is out
return (weightSoFar + 1) / h->merged_weight;
return (weightSoFar + 1) / merged_weight_d;
} else {
leftExcludedW = 0.5;
}
Expand All @@ -311,19 +329,19 @@ double td_cdf(td_histogram_t *h, double val) {

double base = weightSoFar + node_weight / 2 + leftExcludedW;
return (base + dwNoSingleton * (val - node_mean) / (node_mean_next - node_mean)) /
h->merged_weight;
merged_weight_d;
} else {
// this is simply caution against floating point madness
// it is conceivable that the centroids will be different
// but too near to allow safe interpolation
double dw = (node_weight + node_weight_next) / 2;
return (weightSoFar + dw) / h->merged_weight;
return (weightSoFar + dw) / merged_weight_d;
}
} else {
weightSoFar += h->nodes_weight[it];
weightSoFar += (double)h->nodes_weight[it];
}
}
return 1 - 0.5 / h->merged_weight;
return 1 - 0.5 / merged_weight_d;
}

static double td_internal_iterate_centroids_to_index(const td_histogram_t *h, const double index,
Expand All @@ -342,17 +360,18 @@ static double td_internal_iterate_centroids_to_index(const td_histogram_t *h, co

// if the right-most centroid has more than one sample, we still know
// that one sample occurred at max so we can do some interpolation
const double right_centroid_weight = h->nodes_weight[total_centroids - 1];
const double right_centroid_weight = (double)h->nodes_weight[total_centroids - 1];
const double right_centroid_mean = h->nodes_mean[total_centroids - 1];
if (right_centroid_weight > 1 && h->merged_weight - index <= right_centroid_weight / 2) {
return h->max - (h->merged_weight - index - 1) / (right_centroid_weight / 2 - 1) *
if (right_centroid_weight > 1 &&
(double)h->merged_weight - index <= right_centroid_weight / 2) {
return h->max - ((double)h->merged_weight - index - 1) / (right_centroid_weight / 2 - 1) *
(h->max - right_centroid_mean);
}

for (; *node_pos < total_centroids - 1; (*node_pos)++) {
const int i = *node_pos;
const double node_weight = h->nodes_weight[i];
const double node_weight_next = h->nodes_weight[i + 1];
const double node_weight = (double)h->nodes_weight[i];
const double node_weight_next = (double)h->nodes_weight[i + 1];
const double node_mean = h->nodes_mean[i];
const double node_mean_next = h->nodes_mean[i + 1];
const double dw = (node_weight + node_weight_next) / 2;
Expand Down Expand Up @@ -402,7 +421,7 @@ double td_quantile(td_histogram_t *h, double q) {
}

// if values were stored in a sorted array, index would be the offset we are interested in
const double index = q * h->merged_weight;
const double index = q * (double)h->merged_weight;

// beyond the boundaries, we return min or max
// usually, the first centroid will have unit weight so this will make it moot
Expand All @@ -415,7 +434,7 @@ double td_quantile(td_histogram_t *h, double q) {

// if the left centroid has more than one sample, we still know
// that one sample occurred at min so we can do some interpolation
const double left_centroid_weight = h->nodes_weight[0];
const double left_centroid_weight = (double)h->nodes_weight[0];

// in between extremes we interpolate between centroids
double weightSoFar = left_centroid_weight / 2;
Expand Down Expand Up @@ -456,11 +475,7 @@ int td_quantiles(td_histogram_t *h, const double *quantiles, double *values, siz
// we know that there are at least two centroids now
// if the left centroid has more than one sample, we still know
// that one sample occurred at min so we can do some interpolation
const double left_centroid_weight = h->nodes_weight[0];

// if the right-most centroid has more than one sample, we still know
// that one sample occurred at max so we can do some interpolation
const double right_centroid_weight = h->nodes_weight[n - 1];
const double left_centroid_weight = (double)h->nodes_weight[0];

// in between extremes we interpolate between centroids
double weightSoFar = left_centroid_weight / 2;
Expand All @@ -469,7 +484,7 @@ int td_quantiles(td_histogram_t *h, const double *quantiles, double *values, siz
// to avoid allocations we use the values array for intermediate computation
// i.e. to store the expected cumulative count at each percentile
for (size_t qpos = 0; qpos < length; qpos++) {
const double index = quantiles[qpos] * h->merged_weight;
const double index = quantiles[qpos] * (double)h->merged_weight;
values[qpos] = td_internal_iterate_centroids_to_index(h, index, left_centroid_weight, n,
&weightSoFar, &node_pos);
}
Expand All @@ -483,7 +498,7 @@ static double td_internal_trimmed_mean(const td_histogram_t *h, const double lef
double trimmed_count = 0;
for (int i = 0; i < h->merged_nodes; i++) {

const double n_weight = h->nodes_weight[i];
const double n_weight = (double)h->nodes_weight[i];
// Assume the whole centroid falls into the range
double count_add = n_weight;

Expand Down Expand Up @@ -521,8 +536,8 @@ double td_trimmed_mean_symmetric(td_histogram_t *h, double proportion_to_cut) {
}

/* translate the percentiles to counts */
const double leftmost_weight = floor(h->merged_weight * proportion_to_cut);
const double rightmost_weight = ceil(h->merged_weight * (1.0 - proportion_to_cut));
const double leftmost_weight = floor((double)h->merged_weight * proportion_to_cut);
const double rightmost_weight = ceil((double)h->merged_weight * (1.0 - proportion_to_cut));

return td_internal_trimmed_mean(h, leftmost_weight, rightmost_weight);
}
Expand All @@ -540,13 +555,13 @@ double td_trimmed_mean(td_histogram_t *h, double leftmost_cut, double rightmost_
}

/* translate the percentiles to counts */
const double leftmost_weight = floor(h->merged_weight * leftmost_cut);
const double rightmost_weight = ceil(h->merged_weight * rightmost_cut);
const double leftmost_weight = floor((double)h->merged_weight * leftmost_cut);
const double rightmost_weight = ceil((double)h->merged_weight * rightmost_cut);

return td_internal_trimmed_mean(h, leftmost_weight, rightmost_weight);
}

int td_add(td_histogram_t *h, double mean, double weight) {
int td_add(td_histogram_t *h, double mean, long long weight) {
if (should_td_compress(h)) {
const int overflow_res = td_compress(h);
if (overflow_res != 0)
Expand All @@ -555,10 +570,15 @@ int td_add(td_histogram_t *h, double mean, double weight) {
const int pos = next_node(h);
if (pos >= h->cap)
return EDOM;
const double new_unmerged_weight = h->unmerged_weight + weight;
const double new_total_weight = new_unmerged_weight + h->merged_weight;
if (_tdigest_long_long_add_safe(h->unmerged_weight, weight) == false)
return EDOM;
const long long new_unmerged_weight = h->unmerged_weight + weight;
if (_tdigest_long_long_add_safe(new_unmerged_weight, h->merged_weight) == false)
return EDOM;
const long long new_total_weight = new_unmerged_weight + h->merged_weight;
// double-precision overflow detected
const int overflow_res = _check_td_overflow(new_unmerged_weight, new_total_weight);
const int overflow_res =
_check_td_overflow((double)new_unmerged_weight, (double)new_total_weight);
if (overflow_res != 0)
return overflow_res;

Expand All @@ -581,9 +601,9 @@ int td_compress(td_histogram_t *h) {
}
int N = h->merged_nodes + h->unmerged_nodes;
td_qsort(h->nodes_mean, h->nodes_weight, 0, N - 1);
const double total_weight = h->merged_weight + h->unmerged_weight;
const double total_weight = (double)h->merged_weight + (double)h->unmerged_weight;
// double-precision overflow detected
const int overflow_res = _check_td_overflow(h->unmerged_weight, total_weight);
const int overflow_res = _check_td_overflow((double)h->unmerged_weight, (double)total_weight);
if (overflow_res != 0)
return overflow_res;
if (total_weight <= 1)
Expand All @@ -600,7 +620,7 @@ int td_compress(td_histogram_t *h) {
double weight_so_far = 0;

for (int i = 1; i < N; i++) {
const double proposed_weight = h->nodes_weight[cur] + h->nodes_weight[i];
const double proposed_weight = (double)h->nodes_weight[cur] + (double)h->nodes_weight[i];
const double z = proposed_weight * normalizer;
// quantile up to cur
const double q0 = weight_so_far / total_weight;
Expand All @@ -622,7 +642,7 @@ int td_compress(td_histogram_t *h) {
h->nodes_mean[cur] = h->nodes_mean[i];
}
if (cur != i) {
h->nodes_weight[i] = 0.0;
h->nodes_weight[i] = 0;
h->nodes_mean[i] = 0.0;
}
}
Expand All @@ -640,16 +660,11 @@ double td_max(td_histogram_t *h) { return h->max; }

int td_compression(td_histogram_t *h) { return h->compression; }

const double *td_centroids_weight(td_histogram_t *h) { return h->nodes_weight; }
const long long *td_centroids_weight(td_histogram_t *h) { return h->nodes_weight; }

const double *td_centroids_mean(td_histogram_t *h) { return h->nodes_mean; }

double td_centroids_weight_at(td_histogram_t *h, int pos) {
if (pos < 0 || pos > h->merged_nodes) {
return NAN;
}
return h->nodes_weight[pos];
}
long long td_centroids_weight_at(td_histogram_t *h, int pos) { return h->nodes_weight[pos]; }

double td_centroids_mean_at(td_histogram_t *h, int pos) {
if (pos < 0 || pos > h->merged_nodes) {
Expand Down
Loading

0 comments on commit 9dcd73d

Please sign in to comment.