Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement origin for pandas resampling #1962

Draft
wants to merge 3 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 39 additions & 6 deletions cpp/arcticdb/processing/clause.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,16 +8,13 @@
#include <vector>
#include <variant>

#include <folly/Poly.h>

#include <arcticdb/processing/processing_unit.hpp>
#include <arcticdb/column_store/string_pool.hpp>
#include <arcticdb/util/offset_string.hpp>
#include <arcticdb/stream/merge.hpp>

#include <arcticdb/processing/clause.hpp>
#include <arcticdb/pipeline/column_stats.hpp>
#include <arcticdb/pipeline/value_set.hpp>
#include <arcticdb/pipeline/frame_slice.hpp>
#include <arcticdb/stream/segment_aggregator.hpp>
#include <ankerl/unordered_dense.h>
Expand Down Expand Up @@ -424,6 +421,43 @@ std::vector<EntityId> AggregationClause::process(std::vector<EntityId>&& entity_
return str_;
}

template<ResampleBoundary closed_boundary>
ResampleClause<closed_boundary>::ResampleClause(std::string rule,
ResampleBoundary label_boundary,
BucketGeneratorT&& generate_bucket_boundaries,
timestamp offset,
ResampleOrigin origin) :
rule_(std::move(rule)),
label_boundary_(label_boundary),
generate_bucket_boundaries_(std::move(generate_bucket_boundaries)),
offset_(offset),
origin_(std::move(origin)) {
clause_info_.input_structure_ = ProcessingStructure::TIME_BUCKETED;
clause_info_.can_combine_with_column_selection_ = false;
clause_info_.modifies_output_descriptor_ = true;
clause_info_.index_ = KeepCurrentTopLevelIndex();
}

template<ResampleBoundary closed_boundary>
const ClauseInfo& ResampleClause<closed_boundary>::clause_info() const {
return clause_info_;
}

template<ResampleBoundary closed_boundary>
void ResampleClause<closed_boundary>::set_component_manager(std::shared_ptr<ComponentManager> component_manager) {
component_manager_ = std::move(component_manager);
}

template<ResampleBoundary closed_boundary>
std::string ResampleClause<closed_boundary>::rule() const {
return rule_;
}

template<ResampleBoundary closed_boundary>
void ResampleClause<closed_boundary>::set_date_range(timestamp date_range_start, timestamp date_range_end) {
date_range_.emplace(date_range_start, date_range_end);
}

template<ResampleBoundary closed_boundary>
void ResampleClause<closed_boundary>::set_aggregations(const std::vector<NamedAggregator>& named_aggregators) {
clause_info_.input_columns_ = std::make_optional<std::unordered_set<std::string>>();
Expand Down Expand Up @@ -485,7 +519,7 @@ std::vector<std::vector<size_t>> ResampleClause<closed_boundary>::structure_for_
date_range_ = index_range;
}

bucket_boundaries_ = generate_bucket_boundaries_(date_range_->first, date_range_->second, rule_, closed_boundary, offset_);
bucket_boundaries_ = generate_bucket_boundaries_(date_range_->first, date_range_->second, rule_, closed_boundary, offset_, origin_);
if (bucket_boundaries_.size() < 2) {
return {};
}
Expand Down Expand Up @@ -514,8 +548,7 @@ std::vector<std::vector<EntityId>> ResampleClause<closed_boundary>::structure_fo
}

date_range_ = std::make_optional<TimestampRange>(min_start_ts, max_end_ts);

bucket_boundaries_ = generate_bucket_boundaries_(date_range_->first, date_range_->second, rule_, closed_boundary, offset_);
bucket_boundaries_ = generate_bucket_boundaries_(date_range_->first, date_range_->second, rule_, closed_boundary, offset_, origin_);
if (bucket_boundaries_.size() < 2) {
return {};
}
Expand Down
40 changes: 14 additions & 26 deletions cpp/arcticdb/processing/clause.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@

namespace arcticdb {

using ResampleOrigin = std::variant<std::string, timestamp>;

using RangesAndKey = pipelines::RangesAndKey;
using SliceAndKey = pipelines::SliceAndKey;

Expand Down Expand Up @@ -317,6 +319,7 @@ struct AggregationClause {

template<ResampleBoundary closed_boundary>
struct ResampleClause {
using BucketGeneratorT = std::function<std::vector<timestamp>(timestamp, timestamp, std::string_view, ResampleBoundary, timestamp, const ResampleOrigin&)>;
ClauseInfo clause_info_;
std::shared_ptr<ComponentManager> component_manager_;
ProcessingConfig processing_config_;
Expand All @@ -325,29 +328,22 @@ struct ResampleClause {
// This will contain the data range specified by the user (if any) intersected with the range of timestamps for the symbol
std::optional<TimestampRange> date_range_;
// Inject this as a callback in the ctor to avoid language-specific dependencies this low down in the codebase
std::function<std::vector<timestamp>(timestamp, timestamp, std::string_view, ResampleBoundary, timestamp)> generate_bucket_boundaries_;
BucketGeneratorT generate_bucket_boundaries_;
std::vector<timestamp> bucket_boundaries_;
std::vector<SortedAggregatorInterface> aggregators_;
std::string str_;
timestamp offset_;
ResampleOrigin origin_;

ResampleClause() = delete;

ARCTICDB_MOVE_COPY_DEFAULT(ResampleClause)

ResampleClause(const std::string& rule,
ResampleBoundary label_boundary,
std::function<std::vector<timestamp>(timestamp, timestamp, std::string_view, ResampleBoundary, timestamp)>&& generate_bucket_boundaries,
timestamp offset):
rule_(rule),
label_boundary_(label_boundary),
generate_bucket_boundaries_(std::move(generate_bucket_boundaries)),
offset_(offset) {
clause_info_.input_structure_ = ProcessingStructure::TIME_BUCKETED;
clause_info_.can_combine_with_column_selection_ = false;
clause_info_.modifies_output_descriptor_ = true;
clause_info_.index_ = KeepCurrentTopLevelIndex();
}
ResampleClause(std::string rule,
ResampleBoundary label_boundary,
BucketGeneratorT&& generate_bucket_boundaries,
timestamp offset,
ResampleOrigin origin);

[[nodiscard]] std::vector<std::vector<size_t>> structure_for_processing(
std::vector<RangesAndKey>& ranges_and_keys);
Expand All @@ -356,27 +352,19 @@ struct ResampleClause {

[[nodiscard]] std::vector<EntityId> process(std::vector<EntityId>&& entity_ids) const;

[[nodiscard]] const ClauseInfo& clause_info() const {
return clause_info_;
}
[[nodiscard]] const ClauseInfo& clause_info() const;

void set_processing_config(const ProcessingConfig& processing_config);

void set_component_manager(std::shared_ptr<ComponentManager> component_manager) {
component_manager_ = component_manager;
}
void set_component_manager(std::shared_ptr<ComponentManager> component_manager);

[[nodiscard]] std::string to_string() const;

[[nodiscard]] std::string rule() const {
return rule_;
}
[[nodiscard]] std::string rule() const;

void set_aggregations(const std::vector<NamedAggregator>& named_aggregators);

void set_date_range(timestamp date_range_start, timestamp date_range_end) {
date_range_.emplace(date_range_start, date_range_end);
}
void set_date_range(timestamp date_range_start, timestamp date_range_end);

std::vector<timestamp> generate_bucket_boundaries(timestamp first_ts,
timestamp last_ts,
Expand Down
65 changes: 65 additions & 0 deletions cpp/arcticdb/processing/clause_utils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -81,4 +81,69 @@ std::vector<EntityId> flatten_entities(std::vector<std::vector<EntityId>>&& enti
return res;
}

template<ResampleBoundary closed_boundary, typename T>
requires std::is_same_v<T, RangesAndKey> || std::is_same_v<T, RangesAndEntity>
std::vector<std::vector<size_t>> structure_by_time_bucket(
std::vector<T>& ranges,
const std::vector<timestamp>& bucket_boundaries) {
std::erase_if(ranges, [&bucket_boundaries](const T &range) {
auto start_index = range.start_time();
auto end_index = range.end_time();
return index_range_outside_bucket_range<closed_boundary>(start_index, end_index, bucket_boundaries);
});
auto res = structure_by_row_slice(ranges);
// Element i of res also needs the values from element i+1 if there is a bucket which incorporates the last index
// value of row-slice i and the first value of row-slice i+1
// Element i+1 should be removed if the last bucket involved in element i covers all the index values in element i+1
auto bucket_boundaries_it = std::cbegin(bucket_boundaries);
// Exit if res_it == std::prev(res.end()) as this implies the last row slice was not incorporated into an earlier processing unit
for (auto res_it = res.begin(); res_it != res.end() && res_it != std::prev(res.end());) {
auto last_index_value_in_row_slice = ranges[res_it->at(0)].end_time();
advance_boundary_past_value<closed_boundary>(bucket_boundaries, bucket_boundaries_it, last_index_value_in_row_slice);
// bucket_boundaries_it now contains the end value of the last bucket covering the row-slice in res_it, or an end iterator if the last bucket ends before the end of this row-slice
if (bucket_boundaries_it != bucket_boundaries.end()) {
Bucket<closed_boundary> current_bucket{ *std::prev(bucket_boundaries_it), *bucket_boundaries_it };
auto next_row_slice_it = std::next(res_it);
while (next_row_slice_it != res.end()) {
// end_index from the key is 1 nanosecond larger than the index value of the last row in the row-slice
TimestampRange next_row_slice_timestamp_range{
ranges[next_row_slice_it->at(0)].start_time(),
ranges[next_row_slice_it->at(0)].end_time() };
if (current_bucket.contains(next_row_slice_timestamp_range.first)) {
// The last bucket in the current processing unit overlaps with the first index value in the next row slice, so add segments into current processing unit
res_it->insert(res_it->end(), next_row_slice_it->begin(), next_row_slice_it->end());
if (current_bucket.contains(next_row_slice_timestamp_range.second)) {
// The last bucket in the current processing unit wholly contains the next row slice, so remove it from the result
next_row_slice_it = res.erase(next_row_slice_it);
} else {
break;
}
} else {
break;
}
}
// This is the last bucket, and all the required row-slices have been incorporated into the current processing unit, so erase the rest
if (bucket_boundaries_it == std::prev(bucket_boundaries.end())) {
res.erase(next_row_slice_it, res.end());
break;
}
res_it = next_row_slice_it;
}
}
return res;
}

template std::vector<std::vector<size_t>> structure_by_time_bucket<ResampleBoundary::LEFT, RangesAndKey>(
std::vector<RangesAndKey>& ranges,
const std::vector<timestamp>& bucket_boundaries);
template std::vector<std::vector<size_t>> structure_by_time_bucket<ResampleBoundary::RIGHT, RangesAndKey>(
std::vector<RangesAndKey>& ranges,
const std::vector<timestamp>& bucket_boundaries);
template std::vector<std::vector<size_t>> structure_by_time_bucket<ResampleBoundary::LEFT, RangesAndEntity>(
std::vector<RangesAndEntity>& ranges,
const std::vector<timestamp>& bucket_boundaries);
template std::vector<std::vector<size_t>> structure_by_time_bucket<ResampleBoundary::RIGHT, RangesAndEntity>(
std::vector<RangesAndEntity>& ranges,
const std::vector<timestamp>& bucket_boundaries);

}
50 changes: 2 additions & 48 deletions cpp/arcticdb/processing/clause_utils.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -155,54 +155,8 @@ void advance_boundary_past_value(const std::vector<timestamp>& bucket_boundaries
template<ResampleBoundary closed_boundary, typename T>
requires std::is_same_v<T, RangesAndKey> || std::is_same_v<T, RangesAndEntity>
std::vector<std::vector<size_t>> structure_by_time_bucket(
std::vector<T>& ranges,
const std::vector<timestamp>& bucket_boundaries) {
std::erase_if(ranges, [&bucket_boundaries](const T &range) {
auto start_index = range.start_time();
auto end_index = range.end_time();
return index_range_outside_bucket_range<closed_boundary>(start_index, end_index, bucket_boundaries);
});
auto res = structure_by_row_slice(ranges);
// Element i of res also needs the values from element i+1 if there is a bucket which incorporates the last index
// value of row-slice i and the first value of row-slice i+1
// Element i+1 should be removed if the last bucket involved in element i covers all the index values in element i+1
auto bucket_boundaries_it = std::cbegin(bucket_boundaries);
// Exit if res_it == std::prev(res.end()) as this implies the last row slice was not incorporated into an earlier processing unit
for (auto res_it = res.begin(); res_it != res.end() && res_it != std::prev(res.end());) {
auto last_index_value_in_row_slice = ranges[res_it->at(0)].end_time();
advance_boundary_past_value<closed_boundary>(bucket_boundaries, bucket_boundaries_it, last_index_value_in_row_slice);
// bucket_boundaries_it now contains the end value of the last bucket covering the row-slice in res_it, or an end iterator if the last bucket ends before the end of this row-slice
if (bucket_boundaries_it != bucket_boundaries.end()) {
Bucket<closed_boundary> current_bucket{*std::prev(bucket_boundaries_it), *bucket_boundaries_it};
auto next_row_slice_it = std::next(res_it);
while (next_row_slice_it != res.end()) {
// end_index from the key is 1 nanosecond larger than the index value of the last row in the row-slice
TimestampRange next_row_slice_timestamp_range{
ranges[next_row_slice_it->at(0)].start_time(),
ranges[next_row_slice_it->at(0)].end_time()};
if (current_bucket.contains(next_row_slice_timestamp_range.first)) {
// The last bucket in the current processing unit overlaps with the first index value in the next row slice, so add segments into current processing unit
res_it->insert(res_it->end(), next_row_slice_it->begin(), next_row_slice_it->end());
if (current_bucket.contains(next_row_slice_timestamp_range.second)) {
// The last bucket in the current processing unit wholly contains the next row slice, so remove it from the result
next_row_slice_it = res.erase(next_row_slice_it);
} else {
break;
}
} else {
break;
}
}
// This is the last bucket, and all the required row-slices have been incorporated into the current processing unit, so erase the rest
if (bucket_boundaries_it == std::prev(bucket_boundaries.end())) {
res.erase(next_row_slice_it, res.end());
break;
}
res_it = next_row_slice_it;
}
}
return res;
}
std::vector<T>& ranges,
const std::vector<timestamp>& bucket_boundaries);

std::vector<std::vector<EntityId>> structure_by_row_slice(ComponentManager& component_manager, std::vector<std::vector<EntityId>>&& entity_ids_vec);

Expand Down
6 changes: 3 additions & 3 deletions cpp/arcticdb/processing/test/rapidcheck_resample.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
using namespace arcticdb;

auto generate_bucket_boundaries(std::vector<timestamp>&& bucket_boundaries) {
return [bucket_boundaries = std::move(bucket_boundaries)](timestamp, timestamp, std::string_view, ResampleBoundary, timestamp) mutable {
return [bucket_boundaries = std::move(bucket_boundaries)](timestamp, timestamp, std::string_view, ResampleBoundary, timestamp, ResampleOrigin) mutable {
return bucket_boundaries;
};
}
Expand Down Expand Up @@ -113,11 +113,11 @@ RC_GTEST_PROP(Resample, StructureForProcessing, ()) {
}

if (left_boundary_closed) {
ResampleClause<ResampleBoundary::LEFT> resample_clause{"dummy", ResampleBoundary::LEFT, generate_bucket_boundaries(std::move(bucket_boundaries)), 0};
ResampleClause<ResampleBoundary::LEFT> resample_clause{"dummy", ResampleBoundary::LEFT, generate_bucket_boundaries(std::move(bucket_boundaries)), 0, 0};
auto result = resample_clause.structure_for_processing(ranges_and_keys);
RC_ASSERT(expected_result == result);
} else {
ResampleClause<ResampleBoundary::RIGHT> resample_clause{"dummy", ResampleBoundary::RIGHT, generate_bucket_boundaries(std::move(bucket_boundaries)), 0};
ResampleClause<ResampleBoundary::RIGHT> resample_clause{"dummy", ResampleBoundary::RIGHT, generate_bucket_boundaries(std::move(bucket_boundaries)), 0, 0};
auto result = resample_clause.structure_for_processing(ranges_and_keys);
RC_ASSERT(expected_result == result);
}
Expand Down
Loading
Loading