diff --git a/src/commands/cmd_timeseries.cc b/src/commands/cmd_timeseries.cc index 39e5becde31..8fdcdb0a0b6 100644 --- a/src/commands/cmd_timeseries.cc +++ b/src/commands/cmd_timeseries.cc @@ -57,6 +57,7 @@ const std::unordered_map kAggregatorTypeMap {TSAggregatorType::MAX, "max"}, {TSAggregatorType::RANGE, "range"}, {TSAggregatorType::COUNT, "count"}, {TSAggregatorType::FIRST, "first"}, {TSAggregatorType::LAST, "last"}, {TSAggregatorType::STD_P, "std.p"}, {TSAggregatorType::STD_S, "std.s"}, {TSAggregatorType::VAR_P, "var.p"}, {TSAggregatorType::VAR_S, "var.s"}, + {TSAggregatorType::TWA, "twa"}, }; const std::unordered_map kGroupReducerTypeMap = { {GroupReducerType::AVG, "avg"}, {GroupReducerType::SUM, "sum"}, {GroupReducerType::MIN, "min"}, @@ -601,6 +602,8 @@ class CommandTSAggregatorBase : public KeywordCommandBase { type = TSAggregatorType::VAR_P; } else if (parser.EatEqICase("VAR.S")) { type = TSAggregatorType::VAR_S; + } else if (parser.EatEqICase("TWA")) { + type = TSAggregatorType::TWA; } else { return {Status::RedisParseErr, "Invalid aggregator type"}; } diff --git a/src/types/redis_timeseries.cc b/src/types/redis_timeseries.cc index fa14fe03991..09f796e50bf 100644 --- a/src/types/redis_timeseries.cc +++ b/src/types/redis_timeseries.cc @@ -72,16 +72,21 @@ struct Reducer { [](const TSSample &a, const TSSample &b) { return a.v < b.v; }); return max->v - min->v; } + static inline double Area(nonstd::span samples) { + // Intra bucket area is 0 for single element. + double result = 0; + for (size_t i = 1; i < samples.size(); i++) { + auto t_diff = static_cast(samples[i].ts - samples[i - 1].ts); + // Area of bottom rectangle + Area of above triangle + result += (t_diff * samples[i - 1].v) + (t_diff * (samples[i].v - samples[i - 1].v) * 0.5); + } + return result; + } }; -std::vector AggregateSamplesByRangeOption(std::vector samples, const TSRangeOption &option) { +std::vector AggregateSamplesByRangeOption(std::vector samples, const TSRangeOption &option, + const TWABounds &twa_bounds) { const auto &aggregator = option.aggregator; - std::vector res; - if (aggregator.type == TSAggregatorType::NONE || samples.empty()) { - res = std::move(samples); - return res; - } - auto spans = aggregator.SplitSamplesToBuckets(samples); auto get_bucket_ts = [&](uint64_t left) -> uint64_t { using BucketTimestampType = TSRangeOption::BucketTimestampType; @@ -97,7 +102,99 @@ std::vector AggregateSamplesByRangeOption(std::vector sample } return 0; }; + // Linear interpolation. + auto interpolate_sample = [](const TSSample &left_nb, uint64_t ts, const TSSample &right_nb) { + auto y_diff = right_nb.v - left_nb.v; + auto x_diff = static_cast(right_nb.ts - left_nb.ts); + auto x_diff_prime = static_cast(ts - left_nb.ts); + auto y_diff_prime = (x_diff_prime * y_diff) / x_diff; + TSSample sample; + sample.ts = ts; + sample.v = y_diff_prime + left_nb.v; + return sample; + }; + // Computes the TWA of empty bucket from its neighbor samples. + auto empty_bucket_twa = [&interpolate_sample](const TSSample &left_nb, uint64_t bucket_left, uint64_t bucket_right, + const TSSample &right_nb) { + auto left = interpolate_sample(left_nb, bucket_left, right_nb); + auto right = interpolate_sample(left_nb, bucket_right, right_nb); + return Reducer::Area(std::array{left, right}) / static_cast(bucket_right - bucket_left); + }; + + TSSample prev_sample, next_sample; + bool is_twa_aggregator = aggregator.type == TSAggregatorType::TWA, prev_available = false, next_available = false; + if (is_twa_aggregator) { + const bool discard_boundaries = !option.filter_by_ts.empty() || option.filter_by_value.has_value(); + next_sample = twa_bounds.next_sample; + prev_sample = twa_bounds.prev_sample; + // When FILTER_BY_TS/FILTER_BY_VALUE is enabled, discard out-of-boundary samples. + prev_available = discard_boundaries ? false : !samples.empty() && (samples.front().ts != prev_sample.ts); + next_available = discard_boundaries ? false : !samples.empty() && (samples.back().ts != next_sample.ts); + } + std::vector res; + if (is_twa_aggregator && option.is_return_empty && samples.empty()) { + const bool early_return = prev_sample.ts == TSSample::MAX_TIMESTAMP || next_sample.ts == TSSample::MAX_TIMESTAMP || + prev_sample.ts == next_sample.ts; // When filter entire range lies left or right to data. + if (early_return) { + res = std::move(samples); + return res; + } + + uint64_t n_buckets_estimate = (option.end_ts - option.start_ts) / option.aggregator.bucket_duration; + res.reserve(n_buckets_estimate + 1); + uint64_t bucket_left = aggregator.CalculateAlignedBucketLeft(option.start_ts); + uint64_t bucket_right = aggregator.CalculateAlignedBucketRight(bucket_left); + for (size_t i = 0; i < n_buckets_estimate; i++) { + bucket_left = std::max(bucket_left, option.start_ts); + bucket_right = std::min(bucket_right, option.end_ts); + TSSample sample; + sample.ts = bucket_left; + sample.v = empty_bucket_twa(prev_sample, bucket_left, bucket_right, next_sample); + res.push_back(sample); + bucket_left = bucket_right; + bucket_right = aggregator.CalculateAlignedBucketRight(bucket_left); + } + // Process last bucket. + TSSample sample; + sample.ts = bucket_left; + if (bucket_left == option.end_ts) { // Calculate last sample. + sample.v = interpolate_sample(prev_sample, option.end_ts, next_sample).v; + } else { + sample.v = empty_bucket_twa(prev_sample, bucket_left, bucket_right, next_sample); + } + res.push_back(sample); + return res; + } else if (aggregator.type == TSAggregatorType::NONE || samples.empty()) { + res = std::move(samples); + return res; + } + + auto spans = aggregator.SplitSamplesToBuckets(samples); res.reserve(spans.size()); + + auto non_empty_left_bucket_idx = [&spans](size_t curr) { + while (--curr && spans[curr].empty()); + return curr; + }; + auto non_empty_right_bucket_idx = [&spans](size_t curr) { + while (++curr < spans.size() && spans[curr].empty()); + return curr; + }; + + size_t sz = spans.size() - 1; + std::vector> neighbors(spans.size()); + neighbors[0].first = prev_sample; + neighbors[sz].second = next_sample; + if (spans.size() > 1) { + neighbors[0].second = spans[non_empty_right_bucket_idx(0)].front(); + neighbors[sz].first = spans[non_empty_left_bucket_idx(sz)].back(); + } + sz--; + for (size_t i = 1; i < spans.size() - 1; i++, sz--) { + neighbors[i].first = spans[i - 1].empty() ? neighbors[i - 1].first : spans[i - 1].back(); + neighbors[sz].second = spans[sz + 1].empty() ? neighbors[sz + 1].second : spans[sz + 1].front(); + } + uint64_t bucket_left = aggregator.CalculateAlignedBucketLeft(samples.front().ts); for (size_t i = 0; i < spans.size(); i++) { if (option.count_limit && res.size() >= option.count_limit) { @@ -114,6 +211,14 @@ std::vector AggregateSamplesByRangeOption(std::vector sample case TSAggregatorType::COUNT: sample.v = 0; break; + case TSAggregatorType::TWA: + if ((i == 0 && !prev_available) || (i == spans.size() - 1 && !next_available)) { + sample.v = TSSample::NAN_VALUE; + } else { + auto bucket_right = aggregator.CalculateAlignedBucketRight(bucket_left); + sample.v = empty_bucket_twa(neighbors[i].first, bucket_left, bucket_right, neighbors[i].second); + } + break; case TSAggregatorType::LAST: if (i == 0 || spans[i - 1].empty()) { sample.v = TSSample::NAN_VALUE; @@ -126,6 +231,32 @@ std::vector AggregateSamplesByRangeOption(std::vector sample } } else if (!spans[i].empty()) { sample.v = aggregator.AggregateSamplesValue(spans[i]); + + if (is_twa_aggregator) { + auto bucket_right = aggregator.CalculateAlignedBucketRight(bucket_left); + // Cut left and right empty regions. In case of first and last bucket. + bucket_left = std::max(bucket_left, option.start_ts); + bucket_right = std::min(bucket_right, option.end_ts); + // Front area available iff prev_sample < bucket_left < span[i].front(). Similarly for end_area. + bool front_available = (spans[i].front().ts != bucket_left) && (neighbors[i].first.ts <= bucket_left); + bool back_available = (spans[i].back().ts != bucket_right) && (bucket_right <= neighbors[i].second.ts); + double area = 0; + uint64_t l = spans[i].front().ts; + uint64_t r = spans[i].back().ts; + if (front_available) { + TSSample left_sample = interpolate_sample(neighbors[i].first, bucket_left, spans[i].front()); + area += Reducer::Area(std::array{left_sample, spans[i].front()}); + l = bucket_left; + } + if (back_available) { + TSSample right_sample = interpolate_sample(spans[i].back(), bucket_right, neighbors[i].second); + area += Reducer::Area(std::array{spans[i].back(), right_sample}); + r = bucket_right; + } + // Edge case: If single bucket and it contains only one element. + area += !front_available && !back_available && spans[i].size() == 1 ? spans[i][0].v : 0; + sample.v = (sample.v + area) / std::max(static_cast(r - l), 1.0); + } } else { continue; } @@ -810,6 +941,9 @@ double TSAggregator::AggregateSamplesValue(nonstd::span samples) case TSAggregatorType::VAR_S: res = Reducer::VarS(samples); break; + case TSAggregatorType::TWA: + res = Reducer::Area(samples); + break; default: unreachable(); } @@ -1055,18 +1189,24 @@ rocksdb::Status TimeSeries::rangeCommon(engine::Context &ctx, const Slice &ns_ke bool has_aggregator = aggregator.type != TSAggregatorType::NONE; if (iter->Valid()) { if (option.count_limit != 0 && !has_aggregator) { - temp_results.reserve(option.count_limit); + temp_results.reserve(option.count_limit + 2); } else { chunk = CreateTSChunkFromData(iter->value()); auto range = chunk->GetLastTimestamp() - chunk->GetFirstTimestamp() + 1; auto estimate_chunks = std::min((end_timestamp - start_timestamp) / range, uint64_t(32)); - temp_results.reserve(estimate_chunks * metadata.chunk_size); + temp_results.reserve(estimate_chunks * metadata.chunk_size + 2); } } // Get samples from chunks uint64_t bucket_count = 0; uint64_t last_bucket = 0; bool is_not_enough = true; + // Add these two samples at end when aggregator is TWA. + TSSample prev_sample, next_sample; + prev_sample.ts = TSSample::MAX_TIMESTAMP; + next_sample.ts = TSSample::MAX_TIMESTAMP; + const bool is_twa_aggregator = option.aggregator.type == TSAggregatorType::TWA; + for (; iter->Valid() && is_not_enough; iter->Next()) { chunk = CreateTSChunkFromData(iter->value()); auto it = chunk->CreateIterator(); @@ -1081,7 +1221,12 @@ rocksdb::Status TimeSeries::rangeCommon(engine::Context &ctx, const Slice &ns_ke const bool not_time_filtered = option.filter_by_ts.empty() || option.filter_by_ts.count(sample->ts); const bool value_in_range = !option.filter_by_value || (sample->v >= option.filter_by_value->first && sample->v <= option.filter_by_value->second); - + // Record prev and next samples around the filtered range when aggregator is TWA + if (is_twa_aggregator) { + prev_sample = (sample->ts <= start_timestamp) ? *sample : prev_sample; + next_sample = + (sample->ts >= end_timestamp && next_sample.ts == TSSample::MAX_TIMESTAMP) ? *sample : next_sample; + } if (!in_time_range || !not_time_filtered || !value_in_range) { continue; } @@ -1103,8 +1248,21 @@ rocksdb::Status TimeSeries::rangeCommon(engine::Context &ctx, const Slice &ns_ke } } + TWABounds twa_bounds; + if (is_twa_aggregator) { + // If the first element of the series is in first bucket, prev_sample might not get initialized. Similarly if the + // last element in the series is in last bucket, next_sample might not get initialized. If the series is empty, + // prev_sample and next_sample points to infinity (MAX_TIMESTAMP) + prev_sample = + prev_sample.ts == TSSample::MAX_TIMESTAMP && !temp_results.empty() ? temp_results.front() : prev_sample; + next_sample = + next_sample.ts == TSSample::MAX_TIMESTAMP && !temp_results.empty() ? temp_results.back() : next_sample; + twa_bounds.prev_sample = prev_sample; + twa_bounds.next_sample = next_sample; + } + // Process compaction logic - *res = AggregateSamplesByRangeOption(std::move(temp_results), option); + *res = AggregateSamplesByRangeOption(std::move(temp_results), option, twa_bounds); return rocksdb::Status::OK(); } diff --git a/src/types/redis_timeseries.h b/src/types/redis_timeseries.h index 768b8a48f82..9ba4e3e84a3 100644 --- a/src/types/redis_timeseries.h +++ b/src/types/redis_timeseries.h @@ -53,6 +53,7 @@ enum class TSAggregatorType : uint8_t { STD_S = 10, VAR_P = 11, VAR_S = 12, + TWA = 13, }; inline bool IsIncrementalAggregatorType(TSAggregatorType type) { @@ -88,6 +89,11 @@ struct TSAggregator { double AggregateSamplesValue(nonstd::span samples) const; }; +struct TWABounds { + TSSample prev_sample; + TSSample next_sample; +}; + struct TSDownStreamMeta { TSAggregator aggregator; uint64_t latest_bucket_idx; diff --git a/tests/cppunit/types/timeseries_test.cc b/tests/cppunit/types/timeseries_test.cc index 5937a9288df..b72ae26cc78 100644 --- a/tests/cppunit/types/timeseries_test.cc +++ b/tests/cppunit/types/timeseries_test.cc @@ -421,6 +421,178 @@ TEST_F(TimeSeriesTest, Range) { EXPECT_EQ(res.size(), 1); } +TEST_F(TimeSeriesTest, Twa) { + redis::TSCreateOption option; + option.labels = {{"type", "readings"}, {"name", "instrument"}}; + auto s = ts_db_->Create(*ctx_, key_, option); + EXPECT_TRUE(s.ok()); + + std::vector samples = {{12, 10}, {40, 12}, {380, 13}, {401, 18}, {595, 12}, {924, 13}}; + std::vector results2; + results2.resize(samples.size()); + s = ts_db_->MAdd(*ctx_, key_, samples, &results2); + EXPECT_TRUE(s.ok()); + + std::vector res; + redis::TSRangeOption range_opt; + range_opt.start_ts = 0; + range_opt.end_ts = TSSample::MAX_TIMESTAMP; + range_opt.aggregator.type = redis::TSAggregatorType::TWA; + range_opt.aggregator.bucket_duration = 1000; + s = ts_db_->Range(*ctx_, key_, range_opt, &res); + EXPECT_TRUE(s.ok()); + EXPECT_EQ(res[0].ts, 0); + EXPECT_NEAR(res[0].v, 13.05482456, 1e-5); + + res.clear(); + range_opt.aggregator.bucket_duration = 100; + s = ts_db_->Range(*ctx_, key_, range_opt, &res); + EXPECT_TRUE(s.ok()); + EXPECT_EQ(res.size(), 5); + EXPECT_NEAR(res[0].v, 11.7419786, 1e-5); + EXPECT_NEAR(res[1].v, 13.382072, 1e-5); + EXPECT_NEAR(res[2].v, 16.483190, 1e-5); + EXPECT_NEAR(res[3].v, 13.3959984, 1e-5); + EXPECT_NEAR(res[4].v, 12.963525, 1e-5); + + res.clear(); + range_opt.aggregator.bucket_duration = 10; + s = ts_db_->Range(*ctx_, key_, range_opt, &res); + EXPECT_TRUE(s.ok()); + EXPECT_EQ(res.size(), 6); + EXPECT_NEAR(res[0].v, 10.28571, 1e-5); + EXPECT_NEAR(res[1].v, 12.01470, 1e-5); + EXPECT_NEAR(res[2].v, 14.19047, 1e-5); + EXPECT_NEAR(res[3].v, 17.86283, 1e-5); + EXPECT_NEAR(res[4].v, 12.04245, 1e-5); + EXPECT_NEAR(res[5].v, 12.99392, 1e-5); + + res.clear(); + range_opt.start_ts = 100; + range_opt.end_ts = 1000; + range_opt.aggregator.bucket_duration = 5; + s = ts_db_->Range(*ctx_, key_, range_opt, &res); + EXPECT_TRUE(s.ok()); + EXPECT_EQ(res.size(), 4); + EXPECT_NEAR(res[0].v, 13.59523, 1e-5); + EXPECT_NEAR(res[1].v, 17.92670, 1e-5); + EXPECT_NEAR(res[2].v, 12.00759, 1e-5); + EXPECT_NEAR(res[3].v, 12.99392, 1e-5); + + res.clear(); + range_opt.start_ts = 500; + range_opt.end_ts = 713; + range_opt.aggregator.bucket_duration = 10; + s = ts_db_->Range(*ctx_, key_, range_opt, &res); + EXPECT_TRUE(s.ok()); + EXPECT_EQ(res.size(), 1); + EXPECT_NEAR(res[0].v, 12.04245, 1e-5); + + // Two more datasets. + option.labels = {{"type", "numbers"}, {"distribution", "random"}}; + s = ts_db_->Create(*ctx_, "s:a", option); + EXPECT_TRUE(s.ok()); + s = ts_db_->Create(*ctx_, "s:b", option); + EXPECT_TRUE(s.ok()); + + samples = {{100, 20}, {200, 21}, {402, 18}, {600, 22}}; + results2.resize(samples.size()); + s = ts_db_->MAdd(*ctx_, "s:a", samples, &results2); + EXPECT_TRUE(s.ok()); + + samples = {{100, 15}, {300, 16}, {400, 17}, {402, 18}}; + results2.resize(samples.size()); + s = ts_db_->MAdd(*ctx_, "s:b", samples, &results2); + EXPECT_TRUE(s.ok()); + + res.clear(); + range_opt.start_ts = 200; + range_opt.end_ts = 512; + range_opt.aggregator.bucket_duration = 100; + s = ts_db_->Range(*ctx_, "s:a", range_opt, &res); + EXPECT_TRUE(s.ok()); + EXPECT_EQ(res.size(), 2); + EXPECT_NEAR(res[0].v, 20.25742, 1e-5); + EXPECT_NEAR(res[1].v, 18.97039, 1e-5); + + res.clear(); + s = ts_db_->Range(*ctx_, "s:b", range_opt, &res); + EXPECT_TRUE(s.ok()); + EXPECT_EQ(res[0].ts, 300); + EXPECT_NEAR(res[0].v, 16.5, 1e-5); + EXPECT_EQ(res[1].ts, 400); + EXPECT_NEAR(res[1].v, 17.5, 1e-5); + + res.clear(); + range_opt.start_ts = 200; + range_opt.end_ts = 512; + range_opt.aggregator.bucket_duration = 1000; + s = ts_db_->Range(*ctx_, "s:a", range_opt, &res); + EXPECT_TRUE(s.ok()); + EXPECT_EQ(res.size(), 1); + EXPECT_NEAR(res[0].v, 19.36289, 1e-5); + + res.clear(); + s = ts_db_->Range(*ctx_, "s:b", range_opt, &res); + EXPECT_TRUE(s.ok()); + EXPECT_NEAR(res[0].v, 16.13861, 1e-5); + + // Test with FILTER_BY_TS + samples = {{15, 19}, {24, 13}, {25, 15}, {30, 14}, {35, 12}, {32, 19}}; + s = ts_db_->Create(*ctx_, "s:c", option); + EXPECT_TRUE(s.ok()); + results2.clear(); + results2.resize(samples.size()); + s = ts_db_->MAdd(*ctx_, "s:c", samples, &results2); + res.clear(); + std::set filtered_ts = {24, 30, 35}; + range_opt.filter_by_ts.insert(filtered_ts.begin(), filtered_ts.end()); + range_opt.start_ts = 0; + range_opt.end_ts = TSSample::MAX_TIMESTAMP; + range_opt.aggregator.bucket_duration = 10; + s = ts_db_->Range(*ctx_, "s:c", range_opt, &res); + EXPECT_TRUE(s.ok()); + EXPECT_EQ(res.size(), 2); + EXPECT_EQ(res[0].ts, 20); + EXPECT_EQ(res[1].ts, 30); + EXPECT_NEAR(res[0].v, 13.5, 1e-5); + EXPECT_NEAR(res[1].v, 13, 1e-5); + + res.clear(); + range_opt.aggregator.bucket_duration = 100; + s = ts_db_->Range(*ctx_, "s:c", range_opt, &res); + EXPECT_EQ(res[0].ts, 0); + EXPECT_NEAR(res[0].v, 13.27272, 1e-5); + + // Test with FILTER_BY_VALUE + res.clear(); + range_opt.filter_by_ts.clear(); + range_opt.filter_by_value = std::make_pair(15, 19); + s = ts_db_->Range(*ctx_, "s:c", range_opt, &res); + EXPECT_EQ(res[0].ts, 0); + EXPECT_NEAR(res[0].v, 17, 1e-5); + + // Test with EMPTY filter + samples = {{100, 25}, {1000, 12}}; + res.clear(); + results2.clear(); + redis::TSRangeOption range_opt2; + range_opt2.aggregator.type = redis::TSAggregatorType::TWA; + range_opt2.start_ts = 0; + range_opt2.end_ts = TSSample::MAX_TIMESTAMP; + range_opt2.aggregator.bucket_duration = 100; + range_opt2.is_return_empty = true; + EXPECT_TRUE((ts_db_->MAdd(*ctx_, "s:c", samples, &results2)).ok()); + std::vector> results = { + {0, 17.7941176}, {100, 24.27777}, {200, 22.833333}, {300, 21.388888}, {400, 19.944444}, {500, 18.5}, + {600, 17.0555555}, {700, 15.611111}, {800, 14.1666666}, {900, 12.7222222}, {1000, 12}}; + EXPECT_TRUE((ts_db_->Range(*ctx_, "s:c", range_opt2, &res)).ok()); + for (size_t i = 0; i < results.size(); i++) { + EXPECT_EQ(res[i].ts, results[i].first); + EXPECT_NEAR(res[i].v, results[i].second, 1e-4); + } +} + TEST_F(TimeSeriesTest, Get) { redis::TSCreateOption option; auto s = ts_db_->Create(*ctx_, key_, option); diff --git a/tests/gocase/unit/type/timeseries/timeseries_test.go b/tests/gocase/unit/type/timeseries/timeseries_test.go index b0d9a1936e9..e488256ee7b 100644 --- a/tests/gocase/unit/type/timeseries/timeseries_test.go +++ b/tests/gocase/unit/type/timeseries/timeseries_test.go @@ -429,6 +429,20 @@ func testTimeSeries(t *testing.T, configs util.KvrocksServerConfigs) { assert.Equal(t, 1, len(res)) }) + t.Run("TS.RANGE With TWA Aggregation", func(t *testing.T) { + first_key := "first_twa_key" + second_key := "second_twa_key" + require.NoError(t, rdb.Do(ctx, "ts.create", first_key).Err()) + require.NoError(t, rdb.Do(ctx, "ts.create", second_key).Err()) + require.NoError(t, rdb.Do(ctx, "ts.madd", first_key, 100, 20, first_key, 200, 21, first_key, 402, 18, first_key, 600, 22).Err()) + require.NoError(t, rdb.Do(ctx, "ts.madd", second_key, 100, 15, second_key, 300, 16, second_key, 400, 17, second_key, 402, 18).Err()) + + res := rdb.Do(ctx, "ts.range", first_key, 200, 512, "AGGREGATION", "twa", 100).Val().([]interface{}) + require.Equal(t, 2, len(res)) + val := math.Abs((res[0].([]interface{})[1].(float64)) - 20.25742) + assert.True(t, val < 1e-5) + }) + t.Run("TS.GET Basic", func(t *testing.T) { key := "test_get_key" require.NoError(t, rdb.Del(ctx, key).Err())