From fed7726e71c42b0f5d0d30414f106b948cd04637 Mon Sep 17 00:00:00 2001 From: Vasil Pashov Date: Tue, 15 Oct 2024 18:26:10 +0300 Subject: [PATCH] Implement origin for pandas resampling --- cpp/arcticdb/processing/clause.cpp | 45 ++++++-- cpp/arcticdb/processing/clause.hpp | 40 +++---- cpp/arcticdb/processing/clause_utils.cpp | 65 +++++++++++ cpp/arcticdb/processing/clause_utils.hpp | 50 +-------- .../processing/test/rapidcheck_resample.cpp | 6 +- .../processing/test/test_resample.cpp | 30 +++--- cpp/arcticdb/python/python_utils.hpp | 1 - cpp/arcticdb/version/python_bindings.cpp | 101 +++++++++++++----- python/arcticdb/version_store/processing.py | 29 +++-- .../arcticdb/version_store/test_resample.py | 55 ++++++++-- 10 files changed, 280 insertions(+), 142 deletions(-) diff --git a/cpp/arcticdb/processing/clause.cpp b/cpp/arcticdb/processing/clause.cpp index 5c58431253..68e5b5ac81 100644 --- a/cpp/arcticdb/processing/clause.cpp +++ b/cpp/arcticdb/processing/clause.cpp @@ -8,8 +8,6 @@ #include #include -#include - #include #include #include @@ -17,7 +15,6 @@ #include #include -#include #include #include #include @@ -424,6 +421,43 @@ std::vector AggregationClause::process(std::vector&& entity_ return str_; } +template +ResampleClause::ResampleClause(std::string rule, + ResampleBoundary label_boundary, + BucketGeneratorT&& generate_bucket_boundaries, + timestamp offset, + ResampleOrigin origin) : + rule_(std::move(rule)), + label_boundary_(label_boundary), + generate_bucket_boundaries_(std::move(generate_bucket_boundaries)), + offset_(offset), + origin_(std::move(origin)) { + clause_info_.input_structure_ = ProcessingStructure::TIME_BUCKETED; + clause_info_.can_combine_with_column_selection_ = false; + clause_info_.modifies_output_descriptor_ = true; + clause_info_.index_ = KeepCurrentTopLevelIndex(); +} + +template +const ClauseInfo& ResampleClause::clause_info() const { + return clause_info_; +} + +template +void ResampleClause::set_component_manager(std::shared_ptr component_manager) { + component_manager_ = std::move(component_manager); +} + +template +std::string ResampleClause::rule() const { + return rule_; +} + +template +void ResampleClause::set_date_range(timestamp date_range_start, timestamp date_range_end) { + date_range_.emplace(date_range_start, date_range_end); +} + template void ResampleClause::set_aggregations(const std::vector& named_aggregators) { clause_info_.input_columns_ = std::make_optional>(); @@ -485,7 +519,7 @@ std::vector> ResampleClause::structure_for_ date_range_ = index_range; } - bucket_boundaries_ = generate_bucket_boundaries_(date_range_->first, date_range_->second, rule_, closed_boundary, offset_); + bucket_boundaries_ = generate_bucket_boundaries_(date_range_->first, date_range_->second, rule_, closed_boundary, offset_, origin_); if (bucket_boundaries_.size() < 2) { return {}; } @@ -514,8 +548,7 @@ std::vector> ResampleClause::structure_fo } date_range_ = std::make_optional(min_start_ts, max_end_ts); - - bucket_boundaries_ = generate_bucket_boundaries_(date_range_->first, date_range_->second, rule_, closed_boundary, offset_); + bucket_boundaries_ = generate_bucket_boundaries_(date_range_->first, date_range_->second, rule_, closed_boundary, offset_, origin_); if (bucket_boundaries_.size() < 2) { return {}; } diff --git a/cpp/arcticdb/processing/clause.hpp b/cpp/arcticdb/processing/clause.hpp index 901b622b79..134780d8db 100644 --- a/cpp/arcticdb/processing/clause.hpp +++ b/cpp/arcticdb/processing/clause.hpp @@ -36,6 +36,8 @@ namespace arcticdb { +using ResampleOrigin = std::variant; + using RangesAndKey = pipelines::RangesAndKey; using SliceAndKey = pipelines::SliceAndKey; @@ -317,6 +319,7 @@ struct AggregationClause { template struct ResampleClause { + using BucketGeneratorT = std::function(timestamp, timestamp, std::string_view, ResampleBoundary, timestamp, const ResampleOrigin&)>; ClauseInfo clause_info_; std::shared_ptr component_manager_; ProcessingConfig processing_config_; @@ -325,29 +328,22 @@ struct ResampleClause { // This will contain the data range specified by the user (if any) intersected with the range of timestamps for the symbol std::optional date_range_; // Inject this as a callback in the ctor to avoid language-specific dependencies this low down in the codebase - std::function(timestamp, timestamp, std::string_view, ResampleBoundary, timestamp)> generate_bucket_boundaries_; + BucketGeneratorT generate_bucket_boundaries_; std::vector bucket_boundaries_; std::vector aggregators_; std::string str_; timestamp offset_; + ResampleOrigin origin_; ResampleClause() = delete; ARCTICDB_MOVE_COPY_DEFAULT(ResampleClause) - ResampleClause(const std::string& rule, - ResampleBoundary label_boundary, - std::function(timestamp, timestamp, std::string_view, ResampleBoundary, timestamp)>&& generate_bucket_boundaries, - timestamp offset): - rule_(rule), - label_boundary_(label_boundary), - generate_bucket_boundaries_(std::move(generate_bucket_boundaries)), - offset_(offset) { - clause_info_.input_structure_ = ProcessingStructure::TIME_BUCKETED; - clause_info_.can_combine_with_column_selection_ = false; - clause_info_.modifies_output_descriptor_ = true; - clause_info_.index_ = KeepCurrentTopLevelIndex(); - } + ResampleClause(std::string rule, + ResampleBoundary label_boundary, + BucketGeneratorT&& generate_bucket_boundaries, + timestamp offset, + ResampleOrigin origin); [[nodiscard]] std::vector> structure_for_processing( std::vector& ranges_and_keys); @@ -356,27 +352,19 @@ struct ResampleClause { [[nodiscard]] std::vector process(std::vector&& entity_ids) const; - [[nodiscard]] const ClauseInfo& clause_info() const { - return clause_info_; - } + [[nodiscard]] const ClauseInfo& clause_info() const; void set_processing_config(const ProcessingConfig& processing_config); - void set_component_manager(std::shared_ptr component_manager) { - component_manager_ = component_manager; - } + void set_component_manager(std::shared_ptr component_manager); [[nodiscard]] std::string to_string() const; - [[nodiscard]] std::string rule() const { - return rule_; - } + [[nodiscard]] std::string rule() const; void set_aggregations(const std::vector& named_aggregators); - void set_date_range(timestamp date_range_start, timestamp date_range_end) { - date_range_.emplace(date_range_start, date_range_end); - } + void set_date_range(timestamp date_range_start, timestamp date_range_end); std::vector generate_bucket_boundaries(timestamp first_ts, timestamp last_ts, diff --git a/cpp/arcticdb/processing/clause_utils.cpp b/cpp/arcticdb/processing/clause_utils.cpp index 854ad5e39b..e581da43ce 100644 --- a/cpp/arcticdb/processing/clause_utils.cpp +++ b/cpp/arcticdb/processing/clause_utils.cpp @@ -81,4 +81,69 @@ std::vector flatten_entities(std::vector>&& enti return res; } +template +requires std::is_same_v || std::is_same_v +std::vector> structure_by_time_bucket( + std::vector& ranges, + const std::vector& bucket_boundaries) { + std::erase_if(ranges, [&bucket_boundaries](const T &range) { + auto start_index = range.start_time(); + auto end_index = range.end_time(); + return index_range_outside_bucket_range(start_index, end_index, bucket_boundaries); + }); + auto res = structure_by_row_slice(ranges); + // Element i of res also needs the values from element i+1 if there is a bucket which incorporates the last index + // value of row-slice i and the first value of row-slice i+1 + // Element i+1 should be removed if the last bucket involved in element i covers all the index values in element i+1 + auto bucket_boundaries_it = std::cbegin(bucket_boundaries); + // Exit if res_it == std::prev(res.end()) as this implies the last row slice was not incorporated into an earlier processing unit + for (auto res_it = res.begin(); res_it != res.end() && res_it != std::prev(res.end());) { + auto last_index_value_in_row_slice = ranges[res_it->at(0)].end_time(); + advance_boundary_past_value(bucket_boundaries, bucket_boundaries_it, last_index_value_in_row_slice); + // bucket_boundaries_it now contains the end value of the last bucket covering the row-slice in res_it, or an end iterator if the last bucket ends before the end of this row-slice + if (bucket_boundaries_it != bucket_boundaries.end()) { + Bucket current_bucket{ *std::prev(bucket_boundaries_it), *bucket_boundaries_it }; + auto next_row_slice_it = std::next(res_it); + while (next_row_slice_it != res.end()) { + // end_index from the key is 1 nanosecond larger than the index value of the last row in the row-slice + TimestampRange next_row_slice_timestamp_range{ + ranges[next_row_slice_it->at(0)].start_time(), + ranges[next_row_slice_it->at(0)].end_time() }; + if (current_bucket.contains(next_row_slice_timestamp_range.first)) { + // The last bucket in the current processing unit overlaps with the first index value in the next row slice, so add segments into current processing unit + res_it->insert(res_it->end(), next_row_slice_it->begin(), next_row_slice_it->end()); + if (current_bucket.contains(next_row_slice_timestamp_range.second)) { + // The last bucket in the current processing unit wholly contains the next row slice, so remove it from the result + next_row_slice_it = res.erase(next_row_slice_it); + } else { + break; + } + } else { + break; + } + } + // This is the last bucket, and all the required row-slices have been incorporated into the current processing unit, so erase the rest + if (bucket_boundaries_it == std::prev(bucket_boundaries.end())) { + res.erase(next_row_slice_it, res.end()); + break; + } + res_it = next_row_slice_it; + } + } + return res; +} + +template std::vector> structure_by_time_bucket( + std::vector& ranges, + const std::vector& bucket_boundaries); +template std::vector> structure_by_time_bucket( + std::vector& ranges, + const std::vector& bucket_boundaries); +template std::vector> structure_by_time_bucket( + std::vector& ranges, + const std::vector& bucket_boundaries); +template std::vector> structure_by_time_bucket( + std::vector& ranges, + const std::vector& bucket_boundaries); + } diff --git a/cpp/arcticdb/processing/clause_utils.hpp b/cpp/arcticdb/processing/clause_utils.hpp index 7bf93221d0..978de8910d 100644 --- a/cpp/arcticdb/processing/clause_utils.hpp +++ b/cpp/arcticdb/processing/clause_utils.hpp @@ -155,54 +155,8 @@ void advance_boundary_past_value(const std::vector& bucket_boundaries template requires std::is_same_v || std::is_same_v std::vector> structure_by_time_bucket( - std::vector& ranges, - const std::vector& bucket_boundaries) { - std::erase_if(ranges, [&bucket_boundaries](const T &range) { - auto start_index = range.start_time(); - auto end_index = range.end_time(); - return index_range_outside_bucket_range(start_index, end_index, bucket_boundaries); - }); - auto res = structure_by_row_slice(ranges); - // Element i of res also needs the values from element i+1 if there is a bucket which incorporates the last index - // value of row-slice i and the first value of row-slice i+1 - // Element i+1 should be removed if the last bucket involved in element i covers all the index values in element i+1 - auto bucket_boundaries_it = std::cbegin(bucket_boundaries); - // Exit if res_it == std::prev(res.end()) as this implies the last row slice was not incorporated into an earlier processing unit - for (auto res_it = res.begin(); res_it != res.end() && res_it != std::prev(res.end());) { - auto last_index_value_in_row_slice = ranges[res_it->at(0)].end_time(); - advance_boundary_past_value(bucket_boundaries, bucket_boundaries_it, last_index_value_in_row_slice); - // bucket_boundaries_it now contains the end value of the last bucket covering the row-slice in res_it, or an end iterator if the last bucket ends before the end of this row-slice - if (bucket_boundaries_it != bucket_boundaries.end()) { - Bucket current_bucket{*std::prev(bucket_boundaries_it), *bucket_boundaries_it}; - auto next_row_slice_it = std::next(res_it); - while (next_row_slice_it != res.end()) { - // end_index from the key is 1 nanosecond larger than the index value of the last row in the row-slice - TimestampRange next_row_slice_timestamp_range{ - ranges[next_row_slice_it->at(0)].start_time(), - ranges[next_row_slice_it->at(0)].end_time()}; - if (current_bucket.contains(next_row_slice_timestamp_range.first)) { - // The last bucket in the current processing unit overlaps with the first index value in the next row slice, so add segments into current processing unit - res_it->insert(res_it->end(), next_row_slice_it->begin(), next_row_slice_it->end()); - if (current_bucket.contains(next_row_slice_timestamp_range.second)) { - // The last bucket in the current processing unit wholly contains the next row slice, so remove it from the result - next_row_slice_it = res.erase(next_row_slice_it); - } else { - break; - } - } else { - break; - } - } - // This is the last bucket, and all the required row-slices have been incorporated into the current processing unit, so erase the rest - if (bucket_boundaries_it == std::prev(bucket_boundaries.end())) { - res.erase(next_row_slice_it, res.end()); - break; - } - res_it = next_row_slice_it; - } - } - return res; -} + std::vector& ranges, + const std::vector& bucket_boundaries); std::vector> structure_by_row_slice(ComponentManager& component_manager, std::vector>&& entity_ids_vec); diff --git a/cpp/arcticdb/processing/test/rapidcheck_resample.cpp b/cpp/arcticdb/processing/test/rapidcheck_resample.cpp index 14285d7a91..9f7c3fe2b2 100644 --- a/cpp/arcticdb/processing/test/rapidcheck_resample.cpp +++ b/cpp/arcticdb/processing/test/rapidcheck_resample.cpp @@ -17,7 +17,7 @@ using namespace arcticdb; auto generate_bucket_boundaries(std::vector&& bucket_boundaries) { - return [bucket_boundaries = std::move(bucket_boundaries)](timestamp, timestamp, std::string_view, ResampleBoundary, timestamp) mutable { + return [bucket_boundaries = std::move(bucket_boundaries)](timestamp, timestamp, std::string_view, ResampleBoundary, timestamp, ResampleOrigin) mutable { return bucket_boundaries; }; } @@ -113,11 +113,11 @@ RC_GTEST_PROP(Resample, StructureForProcessing, ()) { } if (left_boundary_closed) { - ResampleClause resample_clause{"dummy", ResampleBoundary::LEFT, generate_bucket_boundaries(std::move(bucket_boundaries)), 0}; + ResampleClause resample_clause{"dummy", ResampleBoundary::LEFT, generate_bucket_boundaries(std::move(bucket_boundaries)), 0, 0}; auto result = resample_clause.structure_for_processing(ranges_and_keys); RC_ASSERT(expected_result == result); } else { - ResampleClause resample_clause{"dummy", ResampleBoundary::RIGHT, generate_bucket_boundaries(std::move(bucket_boundaries)), 0}; + ResampleClause resample_clause{"dummy", ResampleBoundary::RIGHT, generate_bucket_boundaries(std::move(bucket_boundaries)), 0, 0}; auto result = resample_clause.structure_for_processing(ranges_and_keys); RC_ASSERT(expected_result == result); } diff --git a/cpp/arcticdb/processing/test/test_resample.cpp b/cpp/arcticdb/processing/test/test_resample.cpp index c2bfa40043..4279540b01 100644 --- a/cpp/arcticdb/processing/test/test_resample.cpp +++ b/cpp/arcticdb/processing/test/test_resample.cpp @@ -14,7 +14,7 @@ using namespace arcticdb; auto generate_bucket_boundaries(std::vector&& bucket_boundaries) { - return [bucket_boundaries = std::move(bucket_boundaries)](timestamp, timestamp, std::string_view, ResampleBoundary, timestamp) { + return [bucket_boundaries = std::move(bucket_boundaries)](timestamp, timestamp, std::string_view, ResampleBoundary, timestamp, ResampleOrigin) { return bucket_boundaries; }; } @@ -33,7 +33,7 @@ TEST(Resample, StructureForProcessingBasic) { // Insert into vector "out of order" to ensure structure_for_processing reorders correctly std::vector ranges_and_keys{bottom, top}; - ResampleClause resample_clause{"dummy", ResampleBoundary::LEFT, generate_bucket_boundaries({1, 500, 1500, 2500, 2999}), 0}; + ResampleClause resample_clause{ "dummy", ResampleBoundary::LEFT, generate_bucket_boundaries({1, 500, 1500, 2500, 2999}), 0, 0 }; auto proc_unit_ids = resample_clause.structure_for_processing(ranges_and_keys); ASSERT_EQ(ranges_and_keys.size(), 2); ASSERT_EQ(ranges_and_keys[0], top); @@ -61,7 +61,7 @@ TEST(Resample, StructureForProcessingColumnSlicing) { // Insert into vector "out of order" to ensure structure_for_processing reorders correctly std::vector ranges_and_keys{top_right, bottom_left, bottom_right, top_left}; - ResampleClause resample_clause{"dummy", ResampleBoundary::LEFT, generate_bucket_boundaries({1, 500, 1500, 2500, 2999}), 0}; + ResampleClause resample_clause{"dummy", ResampleBoundary::LEFT, generate_bucket_boundaries({1, 500, 1500, 2500, 2999}), 0, 0}; auto proc_unit_ids = resample_clause.structure_for_processing(ranges_and_keys); ASSERT_EQ(ranges_and_keys.size(), 4); ASSERT_EQ(ranges_and_keys[0], top_left); @@ -86,7 +86,7 @@ TEST(Resample, StructureForProcessingOverlap) { // Insert into vector "out of order" to ensure structure_for_processing reorders correctly std::vector ranges_and_keys{bottom, top}; - ResampleClause resample_clause{"dummy", ResampleBoundary::LEFT, generate_bucket_boundaries({1, 500, 2500, 2999}), 0}; + ResampleClause resample_clause{"dummy", ResampleBoundary::LEFT, generate_bucket_boundaries({1, 500, 2500, 2999}), 0, 0}; auto proc_unit_ids = resample_clause.structure_for_processing(ranges_and_keys); ASSERT_EQ(ranges_and_keys.size(), 2); ASSERT_EQ(ranges_and_keys[0], top); @@ -113,7 +113,7 @@ TEST(Resample, StructureForProcessingSubsumed) { // Insert into vector "out of order" to ensure structure_for_processing reorders correctly std::vector ranges_and_keys{bottom, middle, top}; - ResampleClause resample_clause{"dummy", ResampleBoundary::LEFT, generate_bucket_boundaries({1, 500, 4500}), 0}; + ResampleClause resample_clause{"dummy", ResampleBoundary::LEFT, generate_bucket_boundaries({1, 500, 4500}), 0, 0}; auto proc_unit_ids = resample_clause.structure_for_processing(ranges_and_keys); ASSERT_EQ(ranges_and_keys.size(), 3); ASSERT_EQ(ranges_and_keys[0], top); @@ -138,7 +138,7 @@ TEST(Resample, StructureForProcessingExactBoundary) { // Insert into vector "out of order" to ensure structure_for_processing reorders correctly std::vector ranges_and_keys{bottom, top}; - ResampleClause resample_clause_left{"dummy", ResampleBoundary::LEFT, generate_bucket_boundaries({1, 500, 2000, 2500, 2999}), 0}; + ResampleClause resample_clause_left{"dummy", ResampleBoundary::LEFT, generate_bucket_boundaries({1, 500, 2000, 2500, 2999}), 0, 0}; auto proc_unit_ids = resample_clause_left.structure_for_processing(ranges_and_keys); ASSERT_EQ(ranges_and_keys.size(), 2); ASSERT_EQ(ranges_and_keys[0], top); @@ -146,7 +146,7 @@ TEST(Resample, StructureForProcessingExactBoundary) { std::vector> expected_proc_unit_ids_left{{0}, {1}}; ASSERT_EQ(expected_proc_unit_ids_left, proc_unit_ids); - ResampleClause resample_clause_right{"dummy", ResampleBoundary::LEFT, generate_bucket_boundaries({1, 500, 2000, 2500, 2999}), 0}; + ResampleClause resample_clause_right{"dummy", ResampleBoundary::LEFT, generate_bucket_boundaries({1, 500, 2000, 2500, 2999}), 0, 0}; proc_unit_ids = resample_clause_right.structure_for_processing(ranges_and_keys); ASSERT_EQ(ranges_and_keys.size(), 2); ASSERT_EQ(ranges_and_keys[0], top); @@ -157,11 +157,11 @@ TEST(Resample, StructureForProcessingExactBoundary) { TEST(Resample, FindBuckets) { // Enough bucket boundaries to test all the interesting cases - ResampleClause resample_left("left", ResampleBoundary::LEFT, generate_bucket_boundaries({0, 10, 20, 30, 40}), 0); - ResampleClause resample_right("right", ResampleBoundary::RIGHT, generate_bucket_boundaries({0, 10, 20, 30, 40}), 0); + ResampleClause resample_left("left", ResampleBoundary::LEFT, generate_bucket_boundaries({0, 10, 20, 30, 40}), 0, 0); + ResampleClause resample_right("right", ResampleBoundary::RIGHT, generate_bucket_boundaries({0, 10, 20, 30, 40}), 0, 0); - resample_left.bucket_boundaries_ = resample_left.generate_bucket_boundaries_(0, 0, "dummy", ResampleBoundary::LEFT, 0); - resample_right.bucket_boundaries_ = resample_right.generate_bucket_boundaries_(0, 0, "dummy", ResampleBoundary::RIGHT, 0); + resample_left.bucket_boundaries_ = resample_left.generate_bucket_boundaries_(0, 0, "dummy", ResampleBoundary::LEFT, 0, 0); + resample_right.bucket_boundaries_ = resample_right.generate_bucket_boundaries_(0, 0, "dummy", ResampleBoundary::RIGHT, 0, 0); std::vector res; @@ -221,8 +221,8 @@ TEST(Resample, FindBuckets) { TEST(Resample, ProcessOneSegment) { auto component_manager = std::make_shared(); - ResampleClause resample("dummy", ResampleBoundary::LEFT, generate_bucket_boundaries({-1, 2, 5}), 0); - resample.bucket_boundaries_ = resample.generate_bucket_boundaries_(0, 0, "dummy", ResampleBoundary::LEFT, 0); + ResampleClause resample("dummy", ResampleBoundary::LEFT, generate_bucket_boundaries({-1, 2, 5}), 0, 0); + resample.bucket_boundaries_ = resample.generate_bucket_boundaries_(0, 0, "dummy", ResampleBoundary::LEFT, 0, 0); resample.date_range_ = {0, 5}; resample.set_component_manager(component_manager); resample.set_aggregations({{"sum", "sum_column", "sum_column"}}); @@ -266,8 +266,8 @@ TEST(Resample, ProcessOneSegment) { TEST(Resample, ProcessMultipleSegments) { auto component_manager = std::make_shared(); - ResampleClause resample("dummy", ResampleBoundary::LEFT, generate_bucket_boundaries({-15, -5, 5, 6, 25, 35, 45, 46, 55, 65}), 0); - resample.bucket_boundaries_ = resample.generate_bucket_boundaries_(0, 0, "dummy", ResampleBoundary::LEFT, 0); + ResampleClause resample("dummy", ResampleBoundary::LEFT, generate_bucket_boundaries({-15, -5, 5, 6, 25, 35, 45, 46, 55, 65}), 0, 0); + resample.bucket_boundaries_ = resample.generate_bucket_boundaries_(0, 0, "dummy", ResampleBoundary::LEFT, 0, 0); resample.date_range_ = {0, 51}; resample.set_component_manager(component_manager); resample.set_aggregations({{"sum", "sum_column", "sum_column"}}); diff --git a/cpp/arcticdb/python/python_utils.hpp b/cpp/arcticdb/python/python_utils.hpp index 10e33c6838..4c4ea638f0 100644 --- a/cpp/arcticdb/python/python_utils.hpp +++ b/cpp/arcticdb/python/python_utils.hpp @@ -8,7 +8,6 @@ #pragma once #include -#include #include #include #include diff --git a/cpp/arcticdb/version/python_bindings.cpp b/cpp/arcticdb/version/python_bindings.cpp index bd837a4e46..45f82a85ec 100644 --- a/cpp/arcticdb/version/python_bindings.cpp +++ b/cpp/arcticdb/version/python_bindings.cpp @@ -5,18 +5,16 @@ * As of the Change Date specified in that file, in accordance with the Business Source License, use of this software will be governed by the Apache License, version 2.0. */ -#include +#include #include #include #include #include #include #include -#include #include #include #include -#include #include #include #include @@ -25,26 +23,87 @@ #include #include #include +#include + namespace arcticdb::version_store { +/// @param ts in nanoseconds +[[nodiscard]] static std::tm nanoseconds_to_tm(timestamp ts) { + // Uses boost to convert to std::tm because STL's functions don't work with pre-epoch timestamps + const timestamp seconds = ts / 1'000'000'000; + const boost::posix_time::ptime time_point = boost::posix_time::ptime(boost::gregorian::date(1970, 1, 1)) + boost::posix_time::seconds(seconds); + return boost::posix_time::to_tm(time_point); +} + +[[nodiscard]] static timestamp tm_to_nanoseconds(const std::tm& tm) { + // Uses boost to convert to std::tm because STL's functions don't work with pre-epoch timestamps + const boost::posix_time::ptime time_point = boost::posix_time::ptime_from_tm(tm); + return boost::posix_time::to_time_t(time_point) * 1'000'000'000; +} + +/// @param ts in nanoseconds +[[nodiscard]] static timestamp start_of_day_nanoseconds(timestamp ts) { + std::tm tm = nanoseconds_to_tm(ts); + tm.tm_hour = 0; + tm.tm_min = 0; + tm.tm_sec = 0; + return tm_to_nanoseconds(tm); +} + +[[nodiscard]] static timestamp end_of_day_nanoseconds(timestamp ts) { + std::tm tm = nanoseconds_to_tm(ts); + tm.tm_hour = 23; + tm.tm_min = 59; + tm.tm_sec = 59; + return tm_to_nanoseconds(tm) + 1'000'000'000; +} + [[nodiscard]] static std::pair compute_first_last_dates( timestamp start, timestamp end, timestamp rule, ResampleBoundary closed_boundary_arg, - timestamp offset + timestamp offset, + const ResampleOrigin& origin ) { - const timestamp ns_to_prev_offset_start = (start - offset) % rule; - const timestamp ns_to_prev_offset_end = (end - offset) % rule; + // Origin value formula from Pandas: + // https://github.com/pandas-dev/pandas/blob/68d9dcab5b543adb3bfe5b83563c61a9b8afae77/pandas/core/resample.py#L2564 + auto [origin_ns, origin_adjusted_start] = util::variant_match( + origin, + [start](timestamp o) -> std::pair {return {o, start}; }, + [&](const std::string& o) -> std::pair { + if (o == "epoch") { + return { 0, start }; + } else if (o == "start") { + return { start, start }; + } else if (o == "start_day") { + return { start_of_day_nanoseconds(start), start }; + } else if (o == "end_day" || o == "end") { + const timestamp origin_last = o == "end" ? end: end_of_day_nanoseconds(end); + const timestamp bucket_count = (origin_last - start) / rule + (closed_boundary_arg == ResampleBoundary::LEFT); + const timestamp origin_ns = origin_last - bucket_count * rule; + return { origin_ns, origin_ns }; + } else { + user_input::raise( + "Invalid origin value {}. Supported values are: \"start\", \"start_day\", \"end\", \"end_day\", \"epoch\" or timestamp in nanoseconds", + o); + } + } + ); + origin_ns += offset; + + const timestamp ns_to_prev_offset_start = (origin_adjusted_start - origin_ns) % rule; + const timestamp ns_to_prev_offset_end = (end - origin_ns) % rule; + if (closed_boundary_arg == ResampleBoundary::RIGHT) { return { - ns_to_prev_offset_start > 0 ? start - ns_to_prev_offset_start : start - rule, + ns_to_prev_offset_start > 0 ? origin_adjusted_start - ns_to_prev_offset_start : origin_adjusted_start - rule, ns_to_prev_offset_end > 0 ? end + (rule - ns_to_prev_offset_end) : end }; } else { return { - ns_to_prev_offset_start > 0 ? start - ns_to_prev_offset_start : start, + ns_to_prev_offset_start > 0 ? origin_adjusted_start - ns_to_prev_offset_start : origin_adjusted_start, ns_to_prev_offset_end > 0 ? end + (rule - ns_to_prev_offset_end) : end + rule }; } @@ -55,14 +114,14 @@ std::vector generate_buckets( timestamp end, std::string_view rule, ResampleBoundary closed_boundary_arg, - timestamp offset + timestamp offset, + const ResampleOrigin& origin ) { - timestamp rule_ns; - { + const timestamp rule_ns = [](std::string_view rule) { py::gil_scoped_acquire acquire_gil; - rule_ns = python_util::pd_to_offset(rule); - } - const auto [start_with_offset, end_with_offset] = compute_first_last_dates(start, end, rule_ns, closed_boundary_arg, offset); + return python_util::pd_to_offset(rule); + }(rule); + const auto [start_with_offset, end_with_offset] = compute_first_last_dates(start, end, rule_ns, closed_boundary_arg, offset, origin); const auto bucket_boundary_count = (end_with_offset - start_with_offset) / rule_ns + 1; std::vector res; res.reserve(bucket_boundary_count); @@ -74,16 +133,10 @@ std::vector generate_buckets( template void declare_resample_clause(py::module& version) { - std::string class_name; - if constexpr (closed_boundary == ResampleBoundary::LEFT) { - class_name = "ResampleClauseLeftClosed"; - } else { - // closed_boundary == ResampleBoundary::RIGHT - class_name = "ResampleClauseRightClosed"; - } - py::class_, std::shared_ptr>>(version, class_name.c_str()) - .def(py::init([](std::string rule, ResampleBoundary label_boundary, timestamp offset){ - return ResampleClause(rule, label_boundary, generate_buckets, offset); + const char* class_name = closed_boundary == ResampleBoundary::LEFT ? "ResampleClauseLeftClosed" : "ResampleClauseRightClosed"; + py::class_, std::shared_ptr>>(version, class_name) + .def(py::init([](std::string rule, ResampleBoundary label_boundary, timestamp offset, ResampleOrigin origin){ + return ResampleClause(std::move(rule), label_boundary, generate_buckets, offset, std::move(origin)); })) .def_property_readonly("rule", &ResampleClause::rule) .def("set_aggregations", [](ResampleClause& self, diff --git a/python/arcticdb/version_store/processing.py b/python/arcticdb/version_store/processing.py index fa0784344b..9c9718c4c6 100644 --- a/python/arcticdb/version_store/processing.py +++ b/python/arcticdb/version_store/processing.py @@ -314,6 +314,7 @@ class PythonResampleClause: aggregations: Dict[str, Union[str, Tuple[str, str]]] = None # In nanosecods offset: int = 0 + origin: Union[str, pd.Timestamp] = "epoch" class QueryBuilder: @@ -584,7 +585,8 @@ def resample( rule: Union[str, pd.DateOffset], closed: Optional[str] = None, label: Optional[str] = None, - offset: Optional[Union[str, pd.Timedelta]] = None + offset: Optional[Union[str, pd.Timedelta]] = None, + origin: Union[str, pd.Timestamp] = 'epoch' ): """ Resample a symbol on the index. The symbol must be datetime indexed. Resample operations must be followed by @@ -631,7 +633,14 @@ def resample( offset: Optional[Union[str, pd.Timedelta]] default=None Offset the start of each bucket. Supported strings are the same as in `pd.Timedelta`. If offset is larger than rule then `offset` modulo `rule` is used as an offset. - + origin: Optional[Union[str, pd.Timestamp]] default='start_day' + The timestamp on which to adjust the grouping. Supported string are: + + * epoch: origin is 1970-01-01 + * start: origin is the first value of the timeseries + * start_day: origin is the first day at midnight of the timeseries + * end: origin is the last value of the timeseries + * end_day: origin is the ceiling midnight of the last day Returns ------- QueryBuilder @@ -736,6 +745,10 @@ def resample( else: offset_ns = 0 + if not (type(origin) is pd.Timestamp or origin in ["start", "end", "start_day", "end_day", "epoch"]): + raise UserInputException(f'Argument origin must be either of type pd.Timestamp or one of ["start", "end", "start_day", "end_day", "epoch"]. Got {offset} instead') + if type(origin) is pd.Timestamp: + origin = origin.nanosecond # This set is documented here: # https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.Series.resample.html#pandas.Series.resample # and lifted directly from pandas.core.resample.TimeGrouper.__init__, and so is inherently fragile to upstream @@ -744,15 +757,15 @@ def resample( boundary_map = { "left": _ResampleBoundary.LEFT, "right": _ResampleBoundary.RIGHT, - None: _ResampleBoundary.RIGHT if rule in end_types else _ResampleBoundary.LEFT + None: _ResampleBoundary.RIGHT if rule in end_types or origin in ["end", "end_day"] else _ResampleBoundary.LEFT } check(closed in boundary_map.keys(), f"closed kwarg to resample must be `left`, 'right', or None, but received '{closed}'") check(label in boundary_map.keys(), f"label kwarg to resample must be `left`, 'right', or None, but received '{closed}'") if boundary_map[closed] == _ResampleBoundary.LEFT: - self.clauses = self.clauses + [_ResampleClauseLeftClosed(rule, boundary_map[label], offset_ns)] + self.clauses = self.clauses + [_ResampleClauseLeftClosed(rule, boundary_map[label], offset_ns, origin)] else: - self.clauses = self.clauses + [_ResampleClauseRightClosed(rule, boundary_map[label], offset_ns)] - self._python_clauses = self._python_clauses + [PythonResampleClause(rule=rule, closed=boundary_map[closed], label=boundary_map[label], offset=offset_ns)] + self.clauses = self.clauses + [_ResampleClauseRightClosed(rule, boundary_map[label], offset_ns, origin)] + self._python_clauses = self._python_clauses + [PythonResampleClause(rule=rule, closed=boundary_map[closed], label=boundary_map[label], offset=offset_ns, origin=origin)] return self @@ -930,9 +943,9 @@ def __setstate__(self, state): self.clauses = self.clauses + [_AggregationClause(self.clauses[-1].grouping_column, python_clause.aggregations)] elif isinstance(python_clause, PythonResampleClause): if python_clause.closed == _ResampleBoundary.LEFT: - self.clauses = self.clauses + [_ResampleClauseLeftClosed(python_clause.rule, python_clause.label, python_clause.offset)] + self.clauses = self.clauses + [_ResampleClauseLeftClosed(python_clause.rule, python_clause.label, python_clause.offset, python_clause.origin)] else: - self.clauses = self.clauses + [_ResampleClauseRightClosed(python_clause.rule, python_clause.label, python_clause.offset)] + self.clauses = self.clauses + [_ResampleClauseRightClosed(python_clause.rule, python_clause.label, python_clause.offset, python_clause.origin)] if python_clause.aggregations is not None: self.clauses[-1].set_aggregations(python_clause.aggregations) elif isinstance(python_clause, PythonRowRangeClause): diff --git a/python/tests/unit/arcticdb/version_store/test_resample.py b/python/tests/unit/arcticdb/version_store/test_resample.py index 965f679340..551190be21 100644 --- a/python/tests/unit/arcticdb/version_store/test_resample.py +++ b/python/tests/unit/arcticdb/version_store/test_resample.py @@ -22,6 +22,9 @@ ALL_AGGREGATIONS = ["sum", "mean", "min", "max", "first", "last", "count"] +def all_aggregations_dict(col): + return {f"to_{agg}": (col, agg) for agg in ALL_AGGREGATIONS} + # Pandas recommended way to resample and exclude buckets with no index values, which is our behaviour # See https://pandas.pydata.org/pandas-docs/stable/user_guide/timeseries.html#sparse-resampling def round(t, freq): @@ -43,18 +46,24 @@ def generic_resample_test_with_empty_buckets(lib, sym, rule, aggregations, date_ assert_frame_equal(expected, received, check_dtype=False) -def generic_resample_test(lib, sym, rule, aggregations, date_range=None, closed=None, label=None, offset=None): +def generic_resample_test(lib, sym, rule, aggregations, date_range=None, closed=None, label=None, offset=None, origin=None): # Pandas doesn't have a good date_range equivalent in resample, so just use read for that expected = lib.read(sym, date_range=date_range).data # Pandas 1.X needs None as the first argument to agg with named aggregators if PANDAS_VERSION >= Version("1.1.0"): - expected = expected.resample(rule, closed=closed, label=label, offset=offset).agg(None, **aggregations) + if origin: + expected = expected.resample(rule, closed=closed, label=label, offset=offset, origin=origin).agg(None, **aggregations) + else: + expected = expected.resample(rule, closed=closed, label=label, offset=offset).agg(None, **aggregations) else: expected = expected.resample(rule, closed=closed, label=label).agg(None, **aggregations) expected = expected.reindex(columns=sorted(expected.columns)) q = QueryBuilder() - q = q.resample(rule, closed=closed, label=label, offset=offset).agg(aggregations) + if origin: + q = q.resample(rule, closed=closed, label=label, offset=offset, origin=origin).agg(aggregations) + else: + q = q.resample(rule, closed=closed, label=label, offset=offset).agg(aggregations) received = lib.read(sym, date_range=date_range, query_builder=q).data received = received.reindex(columns=sorted(received.columns)) @@ -548,10 +557,6 @@ def test_resampling_empty_type_column(lmdb_version_store_empty_types_v1): @pytest.mark.parametrize("closed", ["left", "right"]) class TestResamplingOffset: - @staticmethod - def all_aggregations_dict(col): - return {f"to_{agg}": (col, agg) for agg in ALL_AGGREGATIONS} - @pytest.mark.parametrize("offset", ("30s", pd.Timedelta(seconds=30))) def test_offset_smaller_than_freq(self, lmdb_version_store_v1, closed, offset): lib = lmdb_version_store_v1 @@ -564,7 +569,7 @@ def test_offset_smaller_than_freq(self, lmdb_version_store_v1, closed, offset): lib, sym, "2min", - self.all_aggregations_dict("col"), + all_aggregations_dict("col"), closed=closed, offset="30s" ) @@ -581,7 +586,7 @@ def test_offset_larger_than_freq(self, lmdb_version_store_v1, closed, offset): lib, sym, "2min", - self.all_aggregations_dict("col"), + all_aggregations_dict("col"), closed=closed, offset=offset ) @@ -603,7 +608,7 @@ def test_values_on_offset_boundary(self, lmdb_version_store_v1, closed, offset): lib, sym, "2min", - self.all_aggregations_dict("col"), + all_aggregations_dict("col"), closed=closed, offset=offset ) @@ -630,8 +635,36 @@ def test_with_date_range(self, lmdb_version_store_v1, closed, date_range, offset lib, sym, "2min", - self.all_aggregations_dict("col"), + all_aggregations_dict("col"), closed=closed, offset=offset, date_range=date_range + ) + +@pytest.mark.parametrize("closed", ["left", "right"]) +class TestResamplingOrigin: + + # Timestamps: pre start, between start and end, post end + @pytest.mark.parametrize( + "origin", + ["start", "start_day", "end", "end_day", "epoch", pd.Timestamp("2024-01-01"), pd.Timestamp("2025-01-01 15:00:00"), pd.Timestamp("2025-01-03 15:00:00")] + ) + def test_origin(self, closed, origin, lmdb_version_store_v1): + lib = lmdb_version_store_v1 + sym = "test_origin_special_values" + # Start and end are picked so that #bins * rule + start != end on purpose to test + # the bin generation in case of end and end_day + start = pd.Timestamp("2025-01-01 10:00:33") + end = pd.Timestamp("2025-01-02 12:00:20") + idx = pd.date_range(start, end, freq='10s') + rng = np.random.default_rng() + df = pd.DataFrame({"col": range(len(idx))}, index=idx) + lib.write(sym, df) + generic_resample_test( + lib, + sym, + "2min", + all_aggregations_dict("col"), + closed=closed, + origin=origin ) \ No newline at end of file