Skip to content

Commit

Permalink
Implement origin for pandas resampling
Browse files Browse the repository at this point in the history
  • Loading branch information
vasil-pashov authored and Vasil Pashov committed Oct 29, 2024
1 parent 1a56956 commit fed7726
Show file tree
Hide file tree
Showing 10 changed files with 280 additions and 142 deletions.
45 changes: 39 additions & 6 deletions cpp/arcticdb/processing/clause.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,16 +8,13 @@
#include <vector>
#include <variant>

#include <folly/Poly.h>

#include <arcticdb/processing/processing_unit.hpp>
#include <arcticdb/column_store/string_pool.hpp>
#include <arcticdb/util/offset_string.hpp>
#include <arcticdb/stream/merge.hpp>

#include <arcticdb/processing/clause.hpp>
#include <arcticdb/pipeline/column_stats.hpp>
#include <arcticdb/pipeline/value_set.hpp>
#include <arcticdb/pipeline/frame_slice.hpp>
#include <arcticdb/stream/segment_aggregator.hpp>
#include <ankerl/unordered_dense.h>
Expand Down Expand Up @@ -424,6 +421,43 @@ std::vector<EntityId> AggregationClause::process(std::vector<EntityId>&& entity_
return str_;
}

template<ResampleBoundary closed_boundary>
ResampleClause<closed_boundary>::ResampleClause(std::string rule,
ResampleBoundary label_boundary,
BucketGeneratorT&& generate_bucket_boundaries,
timestamp offset,
ResampleOrigin origin) :
rule_(std::move(rule)),
label_boundary_(label_boundary),
generate_bucket_boundaries_(std::move(generate_bucket_boundaries)),
offset_(offset),
origin_(std::move(origin)) {
clause_info_.input_structure_ = ProcessingStructure::TIME_BUCKETED;
clause_info_.can_combine_with_column_selection_ = false;
clause_info_.modifies_output_descriptor_ = true;
clause_info_.index_ = KeepCurrentTopLevelIndex();
}

template<ResampleBoundary closed_boundary>
const ClauseInfo& ResampleClause<closed_boundary>::clause_info() const {
return clause_info_;
}

template<ResampleBoundary closed_boundary>
void ResampleClause<closed_boundary>::set_component_manager(std::shared_ptr<ComponentManager> component_manager) {
component_manager_ = std::move(component_manager);
}

template<ResampleBoundary closed_boundary>
std::string ResampleClause<closed_boundary>::rule() const {
return rule_;
}

template<ResampleBoundary closed_boundary>
void ResampleClause<closed_boundary>::set_date_range(timestamp date_range_start, timestamp date_range_end) {
date_range_.emplace(date_range_start, date_range_end);
}

template<ResampleBoundary closed_boundary>
void ResampleClause<closed_boundary>::set_aggregations(const std::vector<NamedAggregator>& named_aggregators) {
clause_info_.input_columns_ = std::make_optional<std::unordered_set<std::string>>();
Expand Down Expand Up @@ -485,7 +519,7 @@ std::vector<std::vector<size_t>> ResampleClause<closed_boundary>::structure_for_
date_range_ = index_range;
}

bucket_boundaries_ = generate_bucket_boundaries_(date_range_->first, date_range_->second, rule_, closed_boundary, offset_);
bucket_boundaries_ = generate_bucket_boundaries_(date_range_->first, date_range_->second, rule_, closed_boundary, offset_, origin_);
if (bucket_boundaries_.size() < 2) {
return {};
}
Expand Down Expand Up @@ -514,8 +548,7 @@ std::vector<std::vector<EntityId>> ResampleClause<closed_boundary>::structure_fo
}

date_range_ = std::make_optional<TimestampRange>(min_start_ts, max_end_ts);

bucket_boundaries_ = generate_bucket_boundaries_(date_range_->first, date_range_->second, rule_, closed_boundary, offset_);
bucket_boundaries_ = generate_bucket_boundaries_(date_range_->first, date_range_->second, rule_, closed_boundary, offset_, origin_);
if (bucket_boundaries_.size() < 2) {
return {};
}
Expand Down
40 changes: 14 additions & 26 deletions cpp/arcticdb/processing/clause.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@

namespace arcticdb {

using ResampleOrigin = std::variant<std::string, timestamp>;

using RangesAndKey = pipelines::RangesAndKey;
using SliceAndKey = pipelines::SliceAndKey;

Expand Down Expand Up @@ -317,6 +319,7 @@ struct AggregationClause {

template<ResampleBoundary closed_boundary>
struct ResampleClause {
using BucketGeneratorT = std::function<std::vector<timestamp>(timestamp, timestamp, std::string_view, ResampleBoundary, timestamp, const ResampleOrigin&)>;
ClauseInfo clause_info_;
std::shared_ptr<ComponentManager> component_manager_;
ProcessingConfig processing_config_;
Expand All @@ -325,29 +328,22 @@ struct ResampleClause {
// This will contain the data range specified by the user (if any) intersected with the range of timestamps for the symbol
std::optional<TimestampRange> date_range_;
// Inject this as a callback in the ctor to avoid language-specific dependencies this low down in the codebase
std::function<std::vector<timestamp>(timestamp, timestamp, std::string_view, ResampleBoundary, timestamp)> generate_bucket_boundaries_;
BucketGeneratorT generate_bucket_boundaries_;
std::vector<timestamp> bucket_boundaries_;
std::vector<SortedAggregatorInterface> aggregators_;
std::string str_;
timestamp offset_;
ResampleOrigin origin_;

ResampleClause() = delete;

ARCTICDB_MOVE_COPY_DEFAULT(ResampleClause)

ResampleClause(const std::string& rule,
ResampleBoundary label_boundary,
std::function<std::vector<timestamp>(timestamp, timestamp, std::string_view, ResampleBoundary, timestamp)>&& generate_bucket_boundaries,
timestamp offset):
rule_(rule),
label_boundary_(label_boundary),
generate_bucket_boundaries_(std::move(generate_bucket_boundaries)),
offset_(offset) {
clause_info_.input_structure_ = ProcessingStructure::TIME_BUCKETED;
clause_info_.can_combine_with_column_selection_ = false;
clause_info_.modifies_output_descriptor_ = true;
clause_info_.index_ = KeepCurrentTopLevelIndex();
}
ResampleClause(std::string rule,
ResampleBoundary label_boundary,
BucketGeneratorT&& generate_bucket_boundaries,
timestamp offset,
ResampleOrigin origin);

[[nodiscard]] std::vector<std::vector<size_t>> structure_for_processing(
std::vector<RangesAndKey>& ranges_and_keys);
Expand All @@ -356,27 +352,19 @@ struct ResampleClause {

[[nodiscard]] std::vector<EntityId> process(std::vector<EntityId>&& entity_ids) const;

[[nodiscard]] const ClauseInfo& clause_info() const {
return clause_info_;
}
[[nodiscard]] const ClauseInfo& clause_info() const;

void set_processing_config(const ProcessingConfig& processing_config);

void set_component_manager(std::shared_ptr<ComponentManager> component_manager) {
component_manager_ = component_manager;
}
void set_component_manager(std::shared_ptr<ComponentManager> component_manager);

[[nodiscard]] std::string to_string() const;

[[nodiscard]] std::string rule() const {
return rule_;
}
[[nodiscard]] std::string rule() const;

void set_aggregations(const std::vector<NamedAggregator>& named_aggregators);

void set_date_range(timestamp date_range_start, timestamp date_range_end) {
date_range_.emplace(date_range_start, date_range_end);
}
void set_date_range(timestamp date_range_start, timestamp date_range_end);

std::vector<timestamp> generate_bucket_boundaries(timestamp first_ts,
timestamp last_ts,
Expand Down
65 changes: 65 additions & 0 deletions cpp/arcticdb/processing/clause_utils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -81,4 +81,69 @@ std::vector<EntityId> flatten_entities(std::vector<std::vector<EntityId>>&& enti
return res;
}

template<ResampleBoundary closed_boundary, typename T>
requires std::is_same_v<T, RangesAndKey> || std::is_same_v<T, RangesAndEntity>
std::vector<std::vector<size_t>> structure_by_time_bucket(
std::vector<T>& ranges,
const std::vector<timestamp>& bucket_boundaries) {
std::erase_if(ranges, [&bucket_boundaries](const T &range) {
auto start_index = range.start_time();
auto end_index = range.end_time();
return index_range_outside_bucket_range<closed_boundary>(start_index, end_index, bucket_boundaries);
});
auto res = structure_by_row_slice(ranges);
// Element i of res also needs the values from element i+1 if there is a bucket which incorporates the last index
// value of row-slice i and the first value of row-slice i+1
// Element i+1 should be removed if the last bucket involved in element i covers all the index values in element i+1
auto bucket_boundaries_it = std::cbegin(bucket_boundaries);
// Exit if res_it == std::prev(res.end()) as this implies the last row slice was not incorporated into an earlier processing unit
for (auto res_it = res.begin(); res_it != res.end() && res_it != std::prev(res.end());) {
auto last_index_value_in_row_slice = ranges[res_it->at(0)].end_time();
advance_boundary_past_value<closed_boundary>(bucket_boundaries, bucket_boundaries_it, last_index_value_in_row_slice);
// bucket_boundaries_it now contains the end value of the last bucket covering the row-slice in res_it, or an end iterator if the last bucket ends before the end of this row-slice
if (bucket_boundaries_it != bucket_boundaries.end()) {
Bucket<closed_boundary> current_bucket{ *std::prev(bucket_boundaries_it), *bucket_boundaries_it };
auto next_row_slice_it = std::next(res_it);
while (next_row_slice_it != res.end()) {
// end_index from the key is 1 nanosecond larger than the index value of the last row in the row-slice
TimestampRange next_row_slice_timestamp_range{
ranges[next_row_slice_it->at(0)].start_time(),
ranges[next_row_slice_it->at(0)].end_time() };
if (current_bucket.contains(next_row_slice_timestamp_range.first)) {
// The last bucket in the current processing unit overlaps with the first index value in the next row slice, so add segments into current processing unit
res_it->insert(res_it->end(), next_row_slice_it->begin(), next_row_slice_it->end());
if (current_bucket.contains(next_row_slice_timestamp_range.second)) {
// The last bucket in the current processing unit wholly contains the next row slice, so remove it from the result
next_row_slice_it = res.erase(next_row_slice_it);
} else {
break;
}
} else {
break;
}
}
// This is the last bucket, and all the required row-slices have been incorporated into the current processing unit, so erase the rest
if (bucket_boundaries_it == std::prev(bucket_boundaries.end())) {
res.erase(next_row_slice_it, res.end());
break;
}
res_it = next_row_slice_it;
}
}
return res;
}

template std::vector<std::vector<size_t>> structure_by_time_bucket<ResampleBoundary::LEFT, RangesAndKey>(
std::vector<RangesAndKey>& ranges,
const std::vector<timestamp>& bucket_boundaries);
template std::vector<std::vector<size_t>> structure_by_time_bucket<ResampleBoundary::RIGHT, RangesAndKey>(
std::vector<RangesAndKey>& ranges,
const std::vector<timestamp>& bucket_boundaries);
template std::vector<std::vector<size_t>> structure_by_time_bucket<ResampleBoundary::LEFT, RangesAndEntity>(
std::vector<RangesAndEntity>& ranges,
const std::vector<timestamp>& bucket_boundaries);
template std::vector<std::vector<size_t>> structure_by_time_bucket<ResampleBoundary::RIGHT, RangesAndEntity>(
std::vector<RangesAndEntity>& ranges,
const std::vector<timestamp>& bucket_boundaries);

}
50 changes: 2 additions & 48 deletions cpp/arcticdb/processing/clause_utils.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -155,54 +155,8 @@ void advance_boundary_past_value(const std::vector<timestamp>& bucket_boundaries
template<ResampleBoundary closed_boundary, typename T>
requires std::is_same_v<T, RangesAndKey> || std::is_same_v<T, RangesAndEntity>
std::vector<std::vector<size_t>> structure_by_time_bucket(
std::vector<T>& ranges,
const std::vector<timestamp>& bucket_boundaries) {
std::erase_if(ranges, [&bucket_boundaries](const T &range) {
auto start_index = range.start_time();
auto end_index = range.end_time();
return index_range_outside_bucket_range<closed_boundary>(start_index, end_index, bucket_boundaries);
});
auto res = structure_by_row_slice(ranges);
// Element i of res also needs the values from element i+1 if there is a bucket which incorporates the last index
// value of row-slice i and the first value of row-slice i+1
// Element i+1 should be removed if the last bucket involved in element i covers all the index values in element i+1
auto bucket_boundaries_it = std::cbegin(bucket_boundaries);
// Exit if res_it == std::prev(res.end()) as this implies the last row slice was not incorporated into an earlier processing unit
for (auto res_it = res.begin(); res_it != res.end() && res_it != std::prev(res.end());) {
auto last_index_value_in_row_slice = ranges[res_it->at(0)].end_time();
advance_boundary_past_value<closed_boundary>(bucket_boundaries, bucket_boundaries_it, last_index_value_in_row_slice);
// bucket_boundaries_it now contains the end value of the last bucket covering the row-slice in res_it, or an end iterator if the last bucket ends before the end of this row-slice
if (bucket_boundaries_it != bucket_boundaries.end()) {
Bucket<closed_boundary> current_bucket{*std::prev(bucket_boundaries_it), *bucket_boundaries_it};
auto next_row_slice_it = std::next(res_it);
while (next_row_slice_it != res.end()) {
// end_index from the key is 1 nanosecond larger than the index value of the last row in the row-slice
TimestampRange next_row_slice_timestamp_range{
ranges[next_row_slice_it->at(0)].start_time(),
ranges[next_row_slice_it->at(0)].end_time()};
if (current_bucket.contains(next_row_slice_timestamp_range.first)) {
// The last bucket in the current processing unit overlaps with the first index value in the next row slice, so add segments into current processing unit
res_it->insert(res_it->end(), next_row_slice_it->begin(), next_row_slice_it->end());
if (current_bucket.contains(next_row_slice_timestamp_range.second)) {
// The last bucket in the current processing unit wholly contains the next row slice, so remove it from the result
next_row_slice_it = res.erase(next_row_slice_it);
} else {
break;
}
} else {
break;
}
}
// This is the last bucket, and all the required row-slices have been incorporated into the current processing unit, so erase the rest
if (bucket_boundaries_it == std::prev(bucket_boundaries.end())) {
res.erase(next_row_slice_it, res.end());
break;
}
res_it = next_row_slice_it;
}
}
return res;
}
std::vector<T>& ranges,
const std::vector<timestamp>& bucket_boundaries);

std::vector<std::vector<EntityId>> structure_by_row_slice(ComponentManager& component_manager, std::vector<std::vector<EntityId>>&& entity_ids_vec);

Expand Down
6 changes: 3 additions & 3 deletions cpp/arcticdb/processing/test/rapidcheck_resample.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
using namespace arcticdb;

auto generate_bucket_boundaries(std::vector<timestamp>&& bucket_boundaries) {
return [bucket_boundaries = std::move(bucket_boundaries)](timestamp, timestamp, std::string_view, ResampleBoundary, timestamp) mutable {
return [bucket_boundaries = std::move(bucket_boundaries)](timestamp, timestamp, std::string_view, ResampleBoundary, timestamp, ResampleOrigin) mutable {
return bucket_boundaries;
};
}
Expand Down Expand Up @@ -113,11 +113,11 @@ RC_GTEST_PROP(Resample, StructureForProcessing, ()) {
}

if (left_boundary_closed) {
ResampleClause<ResampleBoundary::LEFT> resample_clause{"dummy", ResampleBoundary::LEFT, generate_bucket_boundaries(std::move(bucket_boundaries)), 0};
ResampleClause<ResampleBoundary::LEFT> resample_clause{"dummy", ResampleBoundary::LEFT, generate_bucket_boundaries(std::move(bucket_boundaries)), 0, 0};
auto result = resample_clause.structure_for_processing(ranges_and_keys);
RC_ASSERT(expected_result == result);
} else {
ResampleClause<ResampleBoundary::RIGHT> resample_clause{"dummy", ResampleBoundary::RIGHT, generate_bucket_boundaries(std::move(bucket_boundaries)), 0};
ResampleClause<ResampleBoundary::RIGHT> resample_clause{"dummy", ResampleBoundary::RIGHT, generate_bucket_boundaries(std::move(bucket_boundaries)), 0, 0};
auto result = resample_clause.structure_for_processing(ranges_and_keys);
RC_ASSERT(expected_result == result);
}
Expand Down
Loading

0 comments on commit fed7726

Please sign in to comment.