Skip to content
3 changes: 3 additions & 0 deletions src/commands/cmd_timeseries.cc
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ const std::unordered_map<TSAggregatorType, std::string_view> kAggregatorTypeMap
{TSAggregatorType::MAX, "max"}, {TSAggregatorType::RANGE, "range"}, {TSAggregatorType::COUNT, "count"},
{TSAggregatorType::FIRST, "first"}, {TSAggregatorType::LAST, "last"}, {TSAggregatorType::STD_P, "std.p"},
{TSAggregatorType::STD_S, "std.s"}, {TSAggregatorType::VAR_P, "var.p"}, {TSAggregatorType::VAR_S, "var.s"},
{TSAggregatorType::TWA, "twa"},
};
const std::unordered_map<GroupReducerType, std::string_view> kGroupReducerTypeMap = {
{GroupReducerType::AVG, "avg"}, {GroupReducerType::SUM, "sum"}, {GroupReducerType::MIN, "min"},
Expand Down Expand Up @@ -601,6 +602,8 @@ class CommandTSAggregatorBase : public KeywordCommandBase {
type = TSAggregatorType::VAR_P;
} else if (parser.EatEqICase("VAR.S")) {
type = TSAggregatorType::VAR_S;
} else if (parser.EatEqICase("TWA")) {
type = TSAggregatorType::TWA;
} else {
return {Status::RedisParseErr, "Invalid aggregator type"};
}
Expand Down
181 changes: 172 additions & 9 deletions src/types/redis_timeseries.cc
Original file line number Diff line number Diff line change
Expand Up @@ -72,16 +72,20 @@
[](const TSSample &a, const TSSample &b) { return a.v < b.v; });
return max->v - min->v;
}
static inline double Area(nonstd::span<const TSSample> samples) {
// Intra bucket area is 0 for single element.
double result = 0;
for (size_t i = 1; i < samples.size(); i++) {
auto t_diff = static_cast<double>(samples[i].ts - samples[i - 1].ts);
// Area of bottom rectangle + Area of above triangle
result += (t_diff * samples[i - 1].v) + (t_diff * (samples[i].v - samples[i - 1].v) * 0.5);
}
return result;
}
};

std::vector<TSSample> AggregateSamplesByRangeOption(std::vector<TSSample> samples, const TSRangeOption &option) {

Check failure on line 87 in src/types/redis_timeseries.cc

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Refactor this function to reduce its Cognitive Complexity from 90 to the 25 allowed.

See more on https://sonarcloud.io/project/issues?id=apache_kvrocks&issues=AZqVdTX5hIk-jR3MX6tH&open=AZqVdTX5hIk-jR3MX6tH&pullRequest=3262
const auto &aggregator = option.aggregator;
std::vector<TSSample> res;
if (aggregator.type == TSAggregatorType::NONE || samples.empty()) {
res = std::move(samples);
return res;
}
auto spans = aggregator.SplitSamplesToBuckets(samples);

auto get_bucket_ts = [&](uint64_t left) -> uint64_t {
using BucketTimestampType = TSRangeOption::BucketTimestampType;
Expand All @@ -97,7 +101,114 @@
}
return 0;
};
/// Computes area of polygon from start of the current bucket to the first sample of the current span.
/// Total Area = Area of bottom rectangle + Area of above triangle.
auto front_area = [](uint64_t bucket_left, const TSSample &prev, const TSSample &curr) {
auto x = static_cast<double>(bucket_left - prev.ts); // Distance from
auto y = static_cast<double>(curr.ts - prev.ts);
auto z = curr.v - prev.v;
auto triangle_area = (z * (y - (x * x) / y)) / 2;
auto rect_area = static_cast<double>(y - x) * prev.v;

Check warning on line 111 in src/types/redis_timeseries.cc

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Remove this redundant cast.

See more on https://sonarcloud.io/project/issues?id=apache_kvrocks&issues=AZqVdTX5hIk-jR3MX6tO&open=AZqVdTX5hIk-jR3MX6tO&pullRequest=3262
return triangle_area + rect_area;
};
/// Computes area of polygon from the last sample of the current span to the end of current bucket.
/// Total Area = Area of bottom rectangle + Area of above triangle.
auto end_area = [](uint64_t bucket_right, const TSSample &curr, const TSSample &next) {
auto x = static_cast<double>(bucket_right - curr.ts);
auto y = static_cast<double>(next.ts - curr.ts);
auto z = next.v - curr.v;
auto rect_area = x * curr.v;
auto triangle_area = (x * x * z) / (2 * y);
return triangle_area + rect_area;
};
// Computes the TWA of empty bucket from its neighbor samples.
auto empty_bucket_twa = [&front_area](const TSSample &left_nb, uint64_t bucket_left, uint64_t bucket_right,
const TSSample &right_nb) {
// Area of empty bucket = Area from left_nb to bucket_right - Area from left_nb to bucket_left
auto f_area = front_area(bucket_left, left_nb, right_nb);
auto s_area = front_area(bucket_right, left_nb, right_nb);
return (f_area - s_area) / static_cast<double>(bucket_right - bucket_left);
};

// Retrieve prev_sample and next_sample from samples when TWA aggregation.
TSSample prev_sample, next_sample;

Check warning on line 134 in src/types/redis_timeseries.cc

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Define each identifier in a dedicated statement.

See more on https://sonarcloud.io/project/issues?id=apache_kvrocks&issues=AZqVdTX5hIk-jR3MX6tL&open=AZqVdTX5hIk-jR3MX6tL&pullRequest=3262
bool is_twa_aggregator = aggregator.type == TSAggregatorType::TWA, prev_available = false, next_available = false;

Check warning on line 135 in src/types/redis_timeseries.cc

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Define each identifier in a dedicated statement.

See more on https://sonarcloud.io/project/issues?id=apache_kvrocks&issues=AZqVdTX5hIk-jR3MX6tM&open=AZqVdTX5hIk-jR3MX6tM&pullRequest=3262
if (is_twa_aggregator) {
const bool discard_boundaries = !option.filter_by_ts.empty() || option.filter_by_value.has_value();
next_sample = samples.back();
samples.pop_back();
prev_sample = samples.back();
samples.pop_back();
// When FILTER_BY_TS/FILTER_BY_VALUE is enabled, discard out-of-boundary samples.
prev_available = discard_boundaries ? false : !samples.empty() && (samples.front().ts != prev_sample.ts);
next_available = discard_boundaries ? false : !samples.empty() && (samples.back().ts != next_sample.ts);
}
Comment on lines 124 to 133
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can pass next_sample, next_available, etc. as function parameters instead of calculating them inside the AggregateSamplesByRangeOption function. For example, we can add a struct:

struct TWABounds {
    std::optional<TSSample> prev_sample;
    std::optional<TSSample> next_sample;
};

And modify the function interface to:

AggregateSamplesByRangeOption(std::vector<TSSample> samples, const TSRangeOption &option, const TWABounds&)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @yezhizi the prev_sample and next_sample are properly initialized as neighbors to the filtered range, as long as the entire dataset is not empty. In other cases, those two are initialized with MAX_TIMESTAMP.

I declared the struct as

struct TWABounds{
    TSSample prev_sample;
    TSSample next_sample;
}

Is there any other need for them to be declared as optional

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using MAX_TIMESTAMP is fine. I just think std::optional has clearer semantics to explicitly indicate the presence or absence of a boundary sample. : )

std::vector<TSSample> res;
if (is_twa_aggregator && option.is_return_empty && samples.empty()) {
const bool early_return = prev_sample.ts == TSSample::MAX_TIMESTAMP || next_sample.ts == TSSample::MAX_TIMESTAMP ||
prev_sample.ts == next_sample.ts; // When filter entire range lies left or right to data.
if (early_return) {

Check warning on line 150 in src/types/redis_timeseries.cc

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Use the init-statement to declare "early_return" inside the if statement.

See more on https://sonarcloud.io/project/issues?id=apache_kvrocks&issues=AZrFDctEPEvpOt4LEWa-&open=AZrFDctEPEvpOt4LEWa-&pullRequest=3262
res = std::move(samples);
return res;
}
// Both prev and next should be available. Total range should be in between the prev and next samples.
assert(prev_sample.ts <= option.start_ts && option.end_ts <= next_sample.ts);

uint64_t n_buckets_estimate = (option.end_ts - option.start_ts) / option.aggregator.bucket_duration;
res.reserve(n_buckets_estimate + 1);
uint64_t bucket_left = aggregator.CalculateAlignedBucketLeft(option.start_ts);
uint64_t bucket_right = aggregator.CalculateAlignedBucketRight(bucket_left);
for (size_t i = 0; i < n_buckets_estimate; i++) {
bucket_left = std::max(bucket_left, option.start_ts);
bucket_right = std::min(bucket_right, option.end_ts);
TSSample sample;
sample.ts = bucket_left;
sample.v = empty_bucket_twa(prev_sample, bucket_left, bucket_right, next_sample);
res.push_back(sample);
bucket_left = bucket_right;
bucket_right = aggregator.CalculateAlignedBucketRight(bucket_left);
}
// Process last bucket.
TSSample sample;
sample.ts = bucket_left;
if (bucket_left == option.end_ts) { // Calculate last sample.
double y_diff = next_sample.v - prev_sample.v;
auto x_diff = static_cast<double>(next_sample.ts - prev_sample.ts);
auto x_prime_diff = static_cast<double>(option.end_ts - prev_sample.ts);
double y_prime_diff = (x_prime_diff * y_diff) / x_diff;
sample.v = y_prime_diff + prev_sample.v;
} else {
sample.v = empty_bucket_twa(prev_sample, bucket_left, bucket_right, next_sample);
}
res.push_back(sample);
return res;
} else if (aggregator.type == TSAggregatorType::NONE || samples.empty()) {
res = std::move(samples);
return res;
}

auto spans = aggregator.SplitSamplesToBuckets(samples);
res.reserve(spans.size());

auto non_empty_left_bucket_idx = [&spans](size_t curr) {
while (--curr && spans[curr].empty());
return curr;
};
auto non_empty_right_bucket_idx = [&spans](size_t curr) {
while (++curr < spans.size() && spans[curr].empty());
return curr;
};

std::vector<std::pair<TSSample, TSSample>> neighbors;
neighbors.reserve(spans.size());
for (size_t i = 0; i < spans.size(); i++) {
TSSample prev = (i != 0) ? spans[non_empty_left_bucket_idx(i)].back() : prev_sample;
TSSample next = (i != (spans.size() - 1)) ? spans[non_empty_right_bucket_idx(i)].front() : next_sample;
neighbors.emplace_back(prev, next);
assert(spans[i].empty() ||
(neighbors[i].first.ts <= spans[i].front().ts && spans[i].back().ts <= neighbors[i].second.ts));
} // Should follow: neighbors[i].first <= span[i].front() <= span[i].back() <= neighbors[i].second;

uint64_t bucket_left = aggregator.CalculateAlignedBucketLeft(samples.front().ts);
for (size_t i = 0; i < spans.size(); i++) {
if (option.count_limit && res.size() >= option.count_limit) {
Expand All @@ -114,6 +225,14 @@
case TSAggregatorType::COUNT:
sample.v = 0;
break;
case TSAggregatorType::TWA:
if ((i == 0 && !prev_available) || (i == spans.size() - 1 && !next_available)) {

Check failure on line 229 in src/types/redis_timeseries.cc

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Refactor this code to not nest more than 3 if|for|do|while|switch statements.

See more on https://sonarcloud.io/project/issues?id=apache_kvrocks&issues=AZrFDctEPEvpOt4LEWa_&open=AZrFDctEPEvpOt4LEWa_&pullRequest=3262
sample.v = TSSample::NAN_VALUE;
} else {
auto bucket_right = aggregator.CalculateAlignedBucketRight(bucket_left);
sample.v = empty_bucket_twa(neighbors[i].first, bucket_left, bucket_right, neighbors[i].second);
}
break;
case TSAggregatorType::LAST:
if (i == 0 || spans[i - 1].empty()) {
sample.v = TSSample::NAN_VALUE;
Expand All @@ -126,6 +245,24 @@
}
} else if (!spans[i].empty()) {
sample.v = aggregator.AggregateSamplesValue(spans[i]);

if (is_twa_aggregator) {
auto bucket_right = aggregator.CalculateAlignedBucketRight(bucket_left);
// Cut left and right empty regions. In case of first and last bucket.
bucket_left = std::max(bucket_left, option.start_ts);
bucket_right = std::min(bucket_right, option.end_ts);
// Front area available iff prev_sample < bucket_left < span[i].front(). Similarly for end_area.
bool front_available = (spans[i].front().ts != bucket_left) && (neighbors[i].first.ts <= bucket_left);
bool back_available = (spans[i].back().ts != bucket_right) && (bucket_right <= neighbors[i].second.ts);
double area = 0;
area += front_available ? front_area(bucket_left, neighbors[i].first, spans[i].front()) : 0.0;
area += back_available ? end_area(bucket_right, spans[i].back(), neighbors[i].second) : 0.0;
// Edge case: If single bucket and it contains only one element.
area += !front_available && !back_available && spans[i].size() == 1 ? spans[i][0].v : 0;
uint64_t l = front_available ? bucket_left : spans[i].front().ts;
uint64_t r = back_available ? bucket_right : spans[i].back().ts;
sample.v = (sample.v + area) / std::max(static_cast<double>(r - l), 1.0);
}
} else {
continue;
}
Expand Down Expand Up @@ -810,6 +947,9 @@
case TSAggregatorType::VAR_S:
res = Reducer::VarS(samples);
break;
case TSAggregatorType::TWA:
res = Reducer::Area(samples);
break;
default:
unreachable();
}
Expand Down Expand Up @@ -1055,18 +1195,24 @@
bool has_aggregator = aggregator.type != TSAggregatorType::NONE;
if (iter->Valid()) {
if (option.count_limit != 0 && !has_aggregator) {
temp_results.reserve(option.count_limit);
temp_results.reserve(option.count_limit + 2);
} else {
chunk = CreateTSChunkFromData(iter->value());
auto range = chunk->GetLastTimestamp() - chunk->GetFirstTimestamp() + 1;
auto estimate_chunks = std::min((end_timestamp - start_timestamp) / range, uint64_t(32));
temp_results.reserve(estimate_chunks * metadata.chunk_size);
temp_results.reserve(estimate_chunks * metadata.chunk_size + 2);
}
}
// Get samples from chunks
uint64_t bucket_count = 0;
uint64_t last_bucket = 0;
bool is_not_enough = true;
// Add these two samples at end when aggregator is TWA.
TSSample prev_sample, next_sample;

Check warning on line 1211 in src/types/redis_timeseries.cc

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Define each identifier in a dedicated statement.

See more on https://sonarcloud.io/project/issues?id=apache_kvrocks&issues=AZqVdTX5hIk-jR3MX6tQ&open=AZqVdTX5hIk-jR3MX6tQ&pullRequest=3262
prev_sample.ts = TSSample::MAX_TIMESTAMP;
next_sample.ts = TSSample::MAX_TIMESTAMP;
const bool is_twa_aggregator = option.aggregator.type == TSAggregatorType::TWA;

for (; iter->Valid() && is_not_enough; iter->Next()) {
chunk = CreateTSChunkFromData(iter->value());
auto it = chunk->CreateIterator();
Expand All @@ -1081,7 +1227,12 @@
const bool not_time_filtered = option.filter_by_ts.empty() || option.filter_by_ts.count(sample->ts);
const bool value_in_range = !option.filter_by_value || (sample->v >= option.filter_by_value->first &&
sample->v <= option.filter_by_value->second);

// Record prev and next samples around the filtered range when aggregator is TWA
if (is_twa_aggregator) {
prev_sample = (sample->ts <= start_timestamp) ? *sample : prev_sample;
next_sample =
(sample->ts >= end_timestamp && next_sample.ts == TSSample::MAX_TIMESTAMP) ? *sample : next_sample;
}
if (!in_time_range || !not_time_filtered || !value_in_range) {
continue;
}
Expand All @@ -1103,6 +1254,18 @@
}
}

if (is_twa_aggregator) {
// If the first element of the series is in first bucket, prev_sample might not get initialized. Similarly if the
// last element in the series is in last bucket, next_sample might not get initialized. If the series is empty,
// prev_sample and next_sample points to infinity (MAX_TIMESTAMP)
prev_sample =
prev_sample.ts == TSSample::MAX_TIMESTAMP && !temp_results.empty() ? temp_results.front() : prev_sample;
next_sample =
next_sample.ts == TSSample::MAX_TIMESTAMP && !temp_results.empty() ? temp_results.back() : next_sample;
temp_results.push_back(prev_sample);
temp_results.push_back(next_sample);
}

// Process compaction logic
*res = AggregateSamplesByRangeOption(std::move(temp_results), option);

Expand Down
1 change: 1 addition & 0 deletions src/types/redis_timeseries.h
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ enum class TSAggregatorType : uint8_t {
STD_S = 10,
VAR_P = 11,
VAR_S = 12,
TWA = 13,
};

inline bool IsIncrementalAggregatorType(TSAggregatorType type) {
Expand Down
Loading