Skip to content
3 changes: 3 additions & 0 deletions src/commands/cmd_timeseries.cc
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ const std::unordered_map<TSAggregatorType, std::string_view> kAggregatorTypeMap
{TSAggregatorType::MAX, "max"}, {TSAggregatorType::RANGE, "range"}, {TSAggregatorType::COUNT, "count"},
{TSAggregatorType::FIRST, "first"}, {TSAggregatorType::LAST, "last"}, {TSAggregatorType::STD_P, "std.p"},
{TSAggregatorType::STD_S, "std.s"}, {TSAggregatorType::VAR_P, "var.p"}, {TSAggregatorType::VAR_S, "var.s"},
{TSAggregatorType::TWA, "twa"},
};
const std::unordered_map<GroupReducerType, std::string_view> kGroupReducerTypeMap = {
{GroupReducerType::AVG, "avg"}, {GroupReducerType::SUM, "sum"}, {GroupReducerType::MIN, "min"},
Expand Down Expand Up @@ -601,6 +602,8 @@ class CommandTSAggregatorBase : public KeywordCommandBase {
type = TSAggregatorType::VAR_P;
} else if (parser.EatEqICase("VAR.S")) {
type = TSAggregatorType::VAR_S;
} else if (parser.EatEqICase("TWA")) {
type = TSAggregatorType::TWA;
} else {
return {Status::RedisParseErr, "Invalid aggregator type"};
}
Expand Down
180 changes: 169 additions & 11 deletions src/types/redis_timeseries.cc
Original file line number Diff line number Diff line change
Expand Up @@ -72,16 +72,21 @@ struct Reducer {
[](const TSSample &a, const TSSample &b) { return a.v < b.v; });
return max->v - min->v;
}
static inline double Area(nonstd::span<const TSSample> samples) {
// Intra bucket area is 0 for single element.
double result = 0;
for (size_t i = 1; i < samples.size(); i++) {
auto t_diff = static_cast<double>(samples[i].ts - samples[i - 1].ts);
// Area of bottom rectangle + Area of above triangle
result += (t_diff * samples[i - 1].v) + (t_diff * (samples[i].v - samples[i - 1].v) * 0.5);
}
return result;
}
};

std::vector<TSSample> AggregateSamplesByRangeOption(std::vector<TSSample> samples, const TSRangeOption &option) {
std::vector<TSSample> AggregateSamplesByRangeOption(std::vector<TSSample> samples, const TSRangeOption &option,
const TWABounds &twa_bounds) {
const auto &aggregator = option.aggregator;
std::vector<TSSample> res;
if (aggregator.type == TSAggregatorType::NONE || samples.empty()) {
res = std::move(samples);
return res;
}
auto spans = aggregator.SplitSamplesToBuckets(samples);

auto get_bucket_ts = [&](uint64_t left) -> uint64_t {
using BucketTimestampType = TSRangeOption::BucketTimestampType;
Expand All @@ -97,7 +102,99 @@ std::vector<TSSample> AggregateSamplesByRangeOption(std::vector<TSSample> sample
}
return 0;
};
// Linear interpolation.
auto interpolate_sample = [](const TSSample &left_nb, uint64_t ts, const TSSample &right_nb) {
auto y_diff = right_nb.v - left_nb.v;
auto x_diff = static_cast<double>(right_nb.ts - left_nb.ts);
auto x_diff_prime = static_cast<double>(ts - left_nb.ts);
auto y_diff_prime = (x_diff_prime * y_diff) / x_diff;
TSSample sample;
sample.ts = ts;
sample.v = y_diff_prime + left_nb.v;
return sample;
};
// Computes the TWA of empty bucket from its neighbor samples.
auto empty_bucket_twa = [&interpolate_sample](const TSSample &left_nb, uint64_t bucket_left, uint64_t bucket_right,
const TSSample &right_nb) {
auto left = interpolate_sample(left_nb, bucket_left, right_nb);
auto right = interpolate_sample(left_nb, bucket_right, right_nb);
return Reducer::Area(std::array<TSSample, 2>{left, right}) / static_cast<double>(bucket_right - bucket_left);
};

TSSample prev_sample, next_sample;
bool is_twa_aggregator = aggregator.type == TSAggregatorType::TWA, prev_available = false, next_available = false;
if (is_twa_aggregator) {
const bool discard_boundaries = !option.filter_by_ts.empty() || option.filter_by_value.has_value();
next_sample = twa_bounds.next_sample;
prev_sample = twa_bounds.prev_sample;
// When FILTER_BY_TS/FILTER_BY_VALUE is enabled, discard out-of-boundary samples.
prev_available = discard_boundaries ? false : !samples.empty() && (samples.front().ts != prev_sample.ts);
next_available = discard_boundaries ? false : !samples.empty() && (samples.back().ts != next_sample.ts);
}
std::vector<TSSample> res;
if (is_twa_aggregator && option.is_return_empty && samples.empty()) {
const bool early_return = prev_sample.ts == TSSample::MAX_TIMESTAMP || next_sample.ts == TSSample::MAX_TIMESTAMP ||
prev_sample.ts == next_sample.ts; // When filter entire range lies left or right to data.
if (early_return) {
res = std::move(samples);
return res;
}

uint64_t n_buckets_estimate = (option.end_ts - option.start_ts) / option.aggregator.bucket_duration;
res.reserve(n_buckets_estimate + 1);
uint64_t bucket_left = aggregator.CalculateAlignedBucketLeft(option.start_ts);
uint64_t bucket_right = aggregator.CalculateAlignedBucketRight(bucket_left);
for (size_t i = 0; i < n_buckets_estimate; i++) {
bucket_left = std::max(bucket_left, option.start_ts);
bucket_right = std::min(bucket_right, option.end_ts);
TSSample sample;
sample.ts = bucket_left;
sample.v = empty_bucket_twa(prev_sample, bucket_left, bucket_right, next_sample);
res.push_back(sample);
bucket_left = bucket_right;
bucket_right = aggregator.CalculateAlignedBucketRight(bucket_left);
}
// Process last bucket.
TSSample sample;
sample.ts = bucket_left;
if (bucket_left == option.end_ts) { // Calculate last sample.
sample.v = interpolate_sample(prev_sample, option.end_ts, next_sample).v;
} else {
sample.v = empty_bucket_twa(prev_sample, bucket_left, bucket_right, next_sample);
}
res.push_back(sample);
return res;
} else if (aggregator.type == TSAggregatorType::NONE || samples.empty()) {
res = std::move(samples);
return res;
}

auto spans = aggregator.SplitSamplesToBuckets(samples);
res.reserve(spans.size());

auto non_empty_left_bucket_idx = [&spans](size_t curr) {
while (--curr && spans[curr].empty());
return curr;
};
auto non_empty_right_bucket_idx = [&spans](size_t curr) {
while (++curr < spans.size() && spans[curr].empty());
return curr;
};

size_t sz = spans.size() - 1;
std::vector<std::pair<TSSample, TSSample>> neighbors(spans.size());
neighbors[0].first = prev_sample;
neighbors[sz].second = next_sample;
if (spans.size() > 1) {
neighbors[0].second = spans[non_empty_right_bucket_idx(0)].front();
neighbors[sz].first = spans[non_empty_left_bucket_idx(sz)].back();
}
sz--;
for (size_t i = 1; i < spans.size() - 1; i++, sz--) {
neighbors[i].first = spans[i - 1].empty() ? neighbors[i - 1].first : spans[i - 1].back();
neighbors[sz].second = spans[sz + 1].empty() ? neighbors[sz + 1].second : spans[sz + 1].front();
}

uint64_t bucket_left = aggregator.CalculateAlignedBucketLeft(samples.front().ts);
for (size_t i = 0; i < spans.size(); i++) {
if (option.count_limit && res.size() >= option.count_limit) {
Expand All @@ -114,6 +211,14 @@ std::vector<TSSample> AggregateSamplesByRangeOption(std::vector<TSSample> sample
case TSAggregatorType::COUNT:
sample.v = 0;
break;
case TSAggregatorType::TWA:
if ((i == 0 && !prev_available) || (i == spans.size() - 1 && !next_available)) {
sample.v = TSSample::NAN_VALUE;
} else {
auto bucket_right = aggregator.CalculateAlignedBucketRight(bucket_left);
sample.v = empty_bucket_twa(neighbors[i].first, bucket_left, bucket_right, neighbors[i].second);
}
break;
case TSAggregatorType::LAST:
if (i == 0 || spans[i - 1].empty()) {
sample.v = TSSample::NAN_VALUE;
Expand All @@ -126,6 +231,32 @@ std::vector<TSSample> AggregateSamplesByRangeOption(std::vector<TSSample> sample
}
} else if (!spans[i].empty()) {
sample.v = aggregator.AggregateSamplesValue(spans[i]);

if (is_twa_aggregator) {
auto bucket_right = aggregator.CalculateAlignedBucketRight(bucket_left);
// Cut left and right empty regions. In case of first and last bucket.
bucket_left = std::max(bucket_left, option.start_ts);
bucket_right = std::min(bucket_right, option.end_ts);
// Front area available iff prev_sample < bucket_left < span[i].front(). Similarly for end_area.
bool front_available = (spans[i].front().ts != bucket_left) && (neighbors[i].first.ts <= bucket_left);
bool back_available = (spans[i].back().ts != bucket_right) && (bucket_right <= neighbors[i].second.ts);
double area = 0;
uint64_t l = spans[i].front().ts;
uint64_t r = spans[i].back().ts;
if (front_available) {
TSSample left_sample = interpolate_sample(neighbors[i].first, bucket_left, spans[i].front());
area += Reducer::Area(std::array<TSSample, 2>{left_sample, spans[i].front()});
l = bucket_left;
}
if (back_available) {
TSSample right_sample = interpolate_sample(spans[i].back(), bucket_right, neighbors[i].second);
area += Reducer::Area(std::array<TSSample, 2>{spans[i].back(), right_sample});
r = bucket_right;
}
// Edge case: If single bucket and it contains only one element.
area += !front_available && !back_available && spans[i].size() == 1 ? spans[i][0].v : 0;
sample.v = (sample.v + area) / std::max(static_cast<double>(r - l), 1.0);
}
} else {
continue;
}
Expand Down Expand Up @@ -810,6 +941,9 @@ double TSAggregator::AggregateSamplesValue(nonstd::span<const TSSample> samples)
case TSAggregatorType::VAR_S:
res = Reducer::VarS(samples);
break;
case TSAggregatorType::TWA:
res = Reducer::Area(samples);
break;
default:
unreachable();
}
Expand Down Expand Up @@ -1055,18 +1189,24 @@ rocksdb::Status TimeSeries::rangeCommon(engine::Context &ctx, const Slice &ns_ke
bool has_aggregator = aggregator.type != TSAggregatorType::NONE;
if (iter->Valid()) {
if (option.count_limit != 0 && !has_aggregator) {
temp_results.reserve(option.count_limit);
temp_results.reserve(option.count_limit + 2);
} else {
chunk = CreateTSChunkFromData(iter->value());
auto range = chunk->GetLastTimestamp() - chunk->GetFirstTimestamp() + 1;
auto estimate_chunks = std::min((end_timestamp - start_timestamp) / range, uint64_t(32));
temp_results.reserve(estimate_chunks * metadata.chunk_size);
temp_results.reserve(estimate_chunks * metadata.chunk_size + 2);
}
}
// Get samples from chunks
uint64_t bucket_count = 0;
uint64_t last_bucket = 0;
bool is_not_enough = true;
// Add these two samples at end when aggregator is TWA.
TSSample prev_sample, next_sample;
prev_sample.ts = TSSample::MAX_TIMESTAMP;
next_sample.ts = TSSample::MAX_TIMESTAMP;
const bool is_twa_aggregator = option.aggregator.type == TSAggregatorType::TWA;

for (; iter->Valid() && is_not_enough; iter->Next()) {
chunk = CreateTSChunkFromData(iter->value());
auto it = chunk->CreateIterator();
Expand All @@ -1081,7 +1221,12 @@ rocksdb::Status TimeSeries::rangeCommon(engine::Context &ctx, const Slice &ns_ke
const bool not_time_filtered = option.filter_by_ts.empty() || option.filter_by_ts.count(sample->ts);
const bool value_in_range = !option.filter_by_value || (sample->v >= option.filter_by_value->first &&
sample->v <= option.filter_by_value->second);

// Record prev and next samples around the filtered range when aggregator is TWA
if (is_twa_aggregator) {
prev_sample = (sample->ts <= start_timestamp) ? *sample : prev_sample;
next_sample =
(sample->ts >= end_timestamp && next_sample.ts == TSSample::MAX_TIMESTAMP) ? *sample : next_sample;
}
if (!in_time_range || !not_time_filtered || !value_in_range) {
continue;
}
Expand All @@ -1103,8 +1248,21 @@ rocksdb::Status TimeSeries::rangeCommon(engine::Context &ctx, const Slice &ns_ke
}
}

TWABounds twa_bounds;
if (is_twa_aggregator) {
// If the first element of the series is in first bucket, prev_sample might not get initialized. Similarly if the
// last element in the series is in last bucket, next_sample might not get initialized. If the series is empty,
// prev_sample and next_sample points to infinity (MAX_TIMESTAMP)
prev_sample =
prev_sample.ts == TSSample::MAX_TIMESTAMP && !temp_results.empty() ? temp_results.front() : prev_sample;
next_sample =
next_sample.ts == TSSample::MAX_TIMESTAMP && !temp_results.empty() ? temp_results.back() : next_sample;
twa_bounds.prev_sample = prev_sample;
twa_bounds.next_sample = next_sample;
}

// Process compaction logic
*res = AggregateSamplesByRangeOption(std::move(temp_results), option);
*res = AggregateSamplesByRangeOption(std::move(temp_results), option, twa_bounds);

return rocksdb::Status::OK();
}
Expand Down
6 changes: 6 additions & 0 deletions src/types/redis_timeseries.h
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ enum class TSAggregatorType : uint8_t {
STD_S = 10,
VAR_P = 11,
VAR_S = 12,
TWA = 13,
};

inline bool IsIncrementalAggregatorType(TSAggregatorType type) {
Expand Down Expand Up @@ -88,6 +89,11 @@ struct TSAggregator {
double AggregateSamplesValue(nonstd::span<const TSSample> samples) const;
};

struct TWABounds {
TSSample prev_sample;
TSSample next_sample;
};

struct TSDownStreamMeta {
TSAggregator aggregator;
uint64_t latest_bucket_idx;
Expand Down
Loading
Loading