Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
eb1acb4
filter combiner has the OR filter logic I need
Tmonster Oct 27, 2023
ed3e381
Revert "filter combiner has the OR filter logic I need"
Tmonster Oct 27, 2023
ffe7d2b
re-enable pushdown or filters test
Tmonster Oct 30, 2023
e75df5e
make format fix
Tmonster Oct 30, 2023
98099d8
have an or filter pushed down, but need the logic to match the exact …
Tmonster Oct 30, 2023
11388d1
progress. I know how to continue
Tmonster Oct 30, 2023
caa0b3c
works but conjunction or statements need to be on the same column id.…
Tmonster Oct 31, 2023
6adb74c
works. can only push down or conjunctions on the same column id. a = …
Tmonster Oct 31, 2023
2f5b9b8
works on parquet as well
Tmonster Oct 31, 2023
cf8b684
Merge remote-tracking branch 'upstream/feature' into pushdown-or-oper…
Tmonster Nov 1, 2023
2ba51d2
tests are passing
Tmonster Nov 1, 2023
532f10f
test bools make format fix
Tmonster Nov 1, 2023
4b526ac
can_pushdown must be used
Tmonster Nov 1, 2023
05a1ae6
skip failing test'
Tmonster Nov 1, 2023
856154f
Merge branch 'feature' into pushdown-or-operators
Tmonster Nov 6, 2023
ec94828
fix test failures
Tmonster Nov 6, 2023
d6c7644
Merge branch 'feature' into pushdown-or-operators
Tmonster Nov 8, 2023
0724dce
have an idea. Need to inspect execute_conjunction and row_group.cpp. …
Tmonster Nov 14, 2023
a7aa314
clang tidy fix
Tmonster Nov 8, 2023
e7a3ef9
some more pushdown attempts. but still getting no where
Tmonster Nov 15, 2023
255363e
Merge branch 'main' into pushdown-or-operators
Tmonster Nov 20, 2023
4b0ffc7
yes, works
Tmonster Nov 22, 2023
4620275
Merge branch 'main' into pushdown-or-operators
Tmonster Nov 29, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion benchmark/imdb/init/load.sql
Original file line number Diff line number Diff line change
Expand Up @@ -18,4 +18,5 @@ CREATE TABLE movie_link AS SELECT * FROM read_parquet('https://github.com/duckdb
CREATE TABLE name AS SELECT * FROM read_parquet('https://github.com/duckdb/duckdb-data/releases/download/v1.0/job_name.parquet');
CREATE TABLE person_info AS SELECT * FROM read_parquet('https://github.com/duckdb/duckdb-data/releases/download/v1.0/job_person_info.parquet');
CREATE TABLE role_type AS SELECT * FROM read_parquet('https://github.com/duckdb/duckdb-data/releases/download/v1.0/job_role_type.parquet');
CREATE TABLE title AS SELECT * FROM read_parquet('https://github.com/duckdb/duckdb-data/releases/download/v1.0/job_title.parquet');
CREATE TABLE title AS SELECT * FROM read_parquet('https://github.com/duckdb/duckdb-data/releases/download/v1.0/job_title.parquet');

1 change: 1 addition & 0 deletions benchmark/imdb_plan_cost/queries/18a.sql
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
pragma threads=1;
SELECT MIN(mi.info) AS movie_budget,
MIN(mi_idx.info) AS movie_votes,
MIN(t.title) AS movie_title
Expand Down
6 changes: 5 additions & 1 deletion src/include/duckdb/planner/table_filter.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
#include "duckdb/common/types.hpp"
#include "duckdb/common/unordered_map.hpp"
#include "duckdb/common/enums/filter_propagate_result.hpp"
#include "duckdb/common/enums/expression_type.hpp"

namespace duckdb {
class BaseStatistics;
Expand Down Expand Up @@ -63,12 +64,14 @@ class TableFilter {
}
};

//!
class TableFilterSet {
public:
unordered_map<idx_t, unique_ptr<TableFilter>> filters;

public:
void PushFilter(idx_t table_index, unique_ptr<TableFilter> filter);
void PushFilter(idx_t column_index, unique_ptr<TableFilter> filter,
TableFilterType conjunction_type = TableFilterType::CONJUNCTION_AND);

bool Equals(TableFilterSet &other) {
if (filters.size() != other.filters.size()) {
Expand Down Expand Up @@ -97,6 +100,7 @@ class TableFilterSet {

void Serialize(Serializer &serializer) const;
static TableFilterSet Deserialize(Deserializer &deserializer);
static bool ExpressionSupportsPushdown(ExpressionType comparison);
};

} // namespace duckdb
108 changes: 99 additions & 9 deletions src/optimizer/filter_combiner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
#include "duckdb/planner/filter/constant_filter.hpp"
#include "duckdb/planner/filter/null_filter.hpp"
#include "duckdb/optimizer/optimizer.hpp"
#include "duckdb/common/types/value.hpp"

namespace duckdb {

Expand Down Expand Up @@ -391,20 +392,29 @@ bool FilterCombiner::HasFilters() {
// return zonemap_checks;
// }

bool TableFilterSet::ExpressionSupportsPushdown(ExpressionType comparison) {
return (comparison == ExpressionType::COMPARE_EQUAL || comparison == ExpressionType::COMPARE_GREATERTHAN ||
comparison == ExpressionType::COMPARE_GREATERTHANOREQUALTO ||
comparison == ExpressionType::COMPARE_LESSTHAN || comparison == ExpressionType::COMPARE_LESSTHANOREQUALTO);
}

static bool ValueTypeSupportsPushown(const Value &value) {
return TypeIsNumeric(value.type().InternalType()) || value.type().InternalType() == PhysicalType::VARCHAR ||
value.type().InternalType() == PhysicalType::BOOL;
}

static bool LeftConstValRightBoundColref(Expression &left, Expression &right) {
return (left.type == ExpressionType::VALUE_CONSTANT && right.type == ExpressionType::BOUND_COLUMN_REF);
}

TableFilterSet FilterCombiner::GenerateTableScanFilters(vector<idx_t> &column_ids) {
TableFilterSet table_filters;
//! First, we figure the filters that have constant expressions that we can push down to the table scan
for (auto &constant_value : constant_values) {
if (!constant_value.second.empty()) {
auto filter_exp = equivalence_map.end();
if ((constant_value.second[0].comparison_type == ExpressionType::COMPARE_EQUAL ||
constant_value.second[0].comparison_type == ExpressionType::COMPARE_GREATERTHAN ||
constant_value.second[0].comparison_type == ExpressionType::COMPARE_GREATERTHANOREQUALTO ||
constant_value.second[0].comparison_type == ExpressionType::COMPARE_LESSTHAN ||
constant_value.second[0].comparison_type == ExpressionType::COMPARE_LESSTHANOREQUALTO) &&
(TypeIsNumeric(constant_value.second[0].constant.type().InternalType()) ||
constant_value.second[0].constant.type().InternalType() == PhysicalType::VARCHAR ||
constant_value.second[0].constant.type().InternalType() == PhysicalType::BOOL)) {
if (TableFilterSet::ExpressionSupportsPushdown(constant_value.second[0].comparison_type) &&
ValueTypeSupportsPushown(constant_value.second[0].constant)) {
//! Here we check if these filters are column references
filter_exp = equivalence_map.find(constant_value.first);
if (filter_exp->second.size() == 1 &&
Expand Down Expand Up @@ -433,6 +443,7 @@ TableFilterSet FilterCombiner::GenerateTableScanFilters(vector<idx_t> &column_id
}
}
//! Here we look for LIKE or IN filters
vector<idx_t> remaining_filters_to_remove;
for (idx_t rem_fil_idx = 0; rem_fil_idx < remaining_filters.size(); rem_fil_idx++) {
auto &remaining_filter = remaining_filters[rem_fil_idx];
if (remaining_filter->expression_class == ExpressionClass::BOUND_FUNCTION) {
Expand Down Expand Up @@ -522,6 +533,14 @@ TableFilterSet FilterCombiner::GenerateTableScanFilters(vector<idx_t> &column_id
// e.g. if we have x IN (1, 2, 3, 4, 5) we transform this into x >= 1 AND x <= 5
if (!type.IsIntegral()) {
continue;
// for (idx_t i = 1; i < func.children.size(); i++) {
// auto &child = func.children[i];
// auto &const_value_expr = child->Cast<BoundConstantExpression>();
// auto filter = make_uniq<ConstantFilter>(ExpressionType::COMPARE_EQUAL, const_value_expr.value);
// table_filters.PushFilter(column_index, std::move(filter), TableFilterType::CONJUNCTION_OR);
// }
// remaining_filters_to_remove.push_back(rem_fil_idx);
// continue;
}

bool can_simplify_in_clause = true;
Expand All @@ -546,6 +565,13 @@ TableFilterSet FilterCombiner::GenerateTableScanFilters(vector<idx_t> &column_id
}
}
if (!can_simplify_in_clause) {
// make one big conjunction OR
for (auto &in_val : in_values) {
auto filter =
make_uniq<ConstantFilter>(ExpressionType::COMPARE_EQUAL, Value::Numeric(type, in_val));
table_filters.PushFilter(column_index, std::move(filter), TableFilterType::CONJUNCTION_OR);
}
remaining_filters_to_remove.push_back(rem_fil_idx);
continue;
}
auto lower_bound = make_uniq<ConstantFilter>(ExpressionType::COMPARE_GREATERTHANOREQUALTO,
Expand All @@ -556,10 +582,74 @@ TableFilterSet FilterCombiner::GenerateTableScanFilters(vector<idx_t> &column_id
table_filters.PushFilter(column_index, std::move(upper_bound));
table_filters.PushFilter(column_index, make_uniq<IsNotNullFilter>());

remaining_filters.erase(remaining_filters.begin() + rem_fil_idx);
remaining_filters_to_remove.push_back(rem_fil_idx);
} else if (remaining_filter->type == ExpressionType::CONJUNCTION_OR) {
// continue;
unordered_set<idx_t> columns_that_need_and_null;
auto &conj = remaining_filter->Cast<BoundConjunctionExpression>();
auto or_filter = make_uniq<ConjunctionOrFilter>();
bool all_can_pushdown = true;
idx_t column_id = DConstants::INVALID_INDEX;
for (auto &expr : conj.children) {
// go through every child of the conjunction and check that it can be pushed down
// if not a direct comparison skip
if (expr->expression_class != ExpressionClass::BOUND_COMPARISON) {
all_can_pushdown = false;
continue;
}
auto &comp = expr->Cast<BoundComparisonExpression>();
BoundColumnRefExpression *colref = nullptr;
BoundConstantExpression *value = nullptr;
bool can_pushdown = false;
// if not a simple comparison between bound column index and constant value, skip
if (TableFilterSet::ExpressionSupportsPushdown(comp.type)) {
if (LeftConstValRightBoundColref(*comp.left, *comp.right)) {
value = &comp.left->Cast<BoundConstantExpression>();
colref = &comp.right->Cast<BoundColumnRefExpression>();
can_pushdown = true;
} else if (LeftConstValRightBoundColref(*comp.right, *comp.left)) {
value = &comp.right->Cast<BoundConstantExpression>();
colref = &comp.left->Cast<BoundColumnRefExpression>();
can_pushdown = true;
}
}
if (!can_pushdown) {
all_can_pushdown = false;
break;
}
auto column_index = column_ids[colref->binding.column_index];
if (column_index == COLUMN_IDENTIFIER_ROW_ID) {
all_can_pushdown = false;
break;
}
// if conjunction has two separate column_ids, skip
// (i.e a = 5 or b = 7)
if (column_id == DConstants::INVALID_INDEX) {
column_id = column_index;
}
if (column_id != column_index) {
all_can_pushdown = false;
break;
}
auto column_filter = make_uniq<ConstantFilter>(comp.type, value->value);
or_filter->child_filters.push_back(std::move(column_filter));
columns_that_need_and_null.insert(column_index);
}
// if all of the expressions in the conjunction or can be pushed down, then add add them to the
// table filters. Otherwise leave it as a filter on the read.
if (all_can_pushdown) {
table_filters.PushFilter(column_id, std::move(or_filter), TableFilterType::CONJUNCTION_OR);
// table_filters.PushFilter(column_id, make_uniq<IsNotNullFilter>(), TableFilterType::CONJUNCTION_AND);
remaining_filters_to_remove.push_back(rem_fil_idx);
}
}
}

for (int i = remaining_filters_to_remove.size() - 1; i >= 0; i--) {
auto remaining_filter_index = remaining_filters_to_remove.at(i);
remaining_filters.erase(remaining_filters.begin() + remaining_filter_index);
}

// GenerateORFilters(table_filters, column_ids);

return table_filters;
Expand Down
2 changes: 1 addition & 1 deletion src/optimizer/optimizer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ unique_ptr<LogicalOperator> Optimizer::Optimize(unique_ptr<LogicalOperator> plan
column_lifetime.VisitOperator(*plan);
});

// compress data based on statistics for materializing operators
// // compress data based on statistics for materializing operators
RunOptimizer(OptimizerType::COMPRESSED_MATERIALIZATION, [&]() {
CompressedMaterialization compressed_materialization(context, binder, std::move(statistics_map));
compressed_materialization.Compress(plan);
Expand Down
24 changes: 23 additions & 1 deletion src/planner/table_filter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,39 @@

namespace duckdb {

void TableFilterSet::PushFilter(idx_t column_index, unique_ptr<TableFilter> filter) {
void TableFilterSet::PushFilter(idx_t column_index, unique_ptr<TableFilter> filter, TableFilterType conjunction_type) {
auto entry = filters.find(column_index);
if (entry == filters.end()) {
// no filter yet: push the filter directly
filters[column_index] = std::move(filter);
} else {
// there is already a filter: AND it together
if (entry->second->filter_type == TableFilterType::CONJUNCTION_AND) {
if (conjunction_type == TableFilterType::CONJUNCTION_OR) {
throw InternalException("Conjunction AND should not be connecting conjunction ors.");
}
auto &and_filter = entry->second->Cast<ConjunctionAndFilter>();
and_filter.child_filters.push_back(std::move(filter));
} else if (entry->second->filter_type == TableFilterType::CONJUNCTION_OR) {
// Or filter connecting ands
if (conjunction_type == TableFilterType::CONJUNCTION_AND) {
auto and_filter = make_uniq<ConjunctionAndFilter>();
and_filter->child_filters.push_back(std::move(entry->second));
and_filter->child_filters.push_back(std::move(filter));
filters[column_index] = std::move(and_filter);
return;
}
auto &or_filter = entry->second->Cast<ConjunctionOrFilter>();
or_filter.child_filters.push_back(std::move(filter));
} else {
if (conjunction_type == TableFilterType::CONJUNCTION_OR) {
auto or_filter = make_uniq<ConjunctionOrFilter>();
or_filter->child_filters.push_back(std::move(entry->second));
or_filter->child_filters.push_back(std::move(filter));
filters[column_index] = std::move(or_filter);
return;
}
D_ASSERT(conjunction_type == TableFilterType::CONJUNCTION_AND);
auto and_filter = make_uniq<ConjunctionAndFilter>();
and_filter->child_filters.push_back(std::move(entry->second));
and_filter->child_filters.push_back(std::move(filter));
Expand Down
19 changes: 19 additions & 0 deletions src/storage/table/column_segment.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -340,13 +340,32 @@ idx_t ColumnSegment::FilterSelection(SelectionVector &sel, Vector &result, const
// similar to the CONJUNCTION_AND, but we need to take care of the SelectionVectors (OR all of them)
idx_t count_total = 0;
SelectionVector result_sel(approved_tuple_count);

// SelectionVector current_sel = result_sel;
// idx_t current_count = approved_tuple_count;
// idx_t result_count = 0;
//
// unique_ptr<SelectionVector> temp_true, temp_false;
// unique_ptr<SelectionVector> false_sel;
//
// temp_true = make_uniq<SelectionVector>(STANDARD_VECTOR_SIZE);
//
// temp_false = make_uniq<SelectionVector>(STANDARD_VECTOR_SIZE);

auto &conjunction_or = filter.Cast<ConjunctionOrFilter>();
for (auto &child_filter : conjunction_or.child_filters) {
SelectionVector temp_sel;
temp_sel.Initialize(sel);
idx_t temp_tuple_count = approved_tuple_count;
idx_t temp_count = FilterSelection(temp_sel, result, *child_filter, temp_tuple_count, mask);
// tuples passed, move them into the actual result vector
// if (temp_count > 0) {
// for (idx_t i = 0; i < temp_count; i++) {
// result_sel.set_index(result_count++, temp_true->get_index(i));
// }
// current_count -= temp_count;
//
// }
for (idx_t i = 0; i < temp_count; i++) {
auto new_idx = temp_sel.get_index(i);
bool is_new_idx = true;
Expand Down
44 changes: 40 additions & 4 deletions src/storage/table/row_group.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#include "duckdb/common/serializer/serializer.hpp"
#include "duckdb/common/serializer/deserializer.hpp"
#include "duckdb/common/serializer/binary_serializer.hpp"
#include "iostream"

namespace duckdb {

Expand Down Expand Up @@ -267,7 +268,7 @@ unique_ptr<RowGroup> RowGroup::AlterType(RowGroupCollection &new_collection, con
}

unique_ptr<RowGroup> RowGroup::AddColumn(RowGroupCollection &new_collection, ColumnDefinition &new_column,
ExpressionExecutor &executor, Expression &default_value, Vector &result) {
ExpressionExecutor &executor, Expression &default_value, Vector &result) {
Verify();

// construct a new column data for the new column
Expand Down Expand Up @@ -394,11 +395,12 @@ void RowGroup::TemplatedScan(TransactionData transaction, CollectionScanState &s
const bool ALLOW_UPDATES = TYPE != TableScanType::TABLE_SCAN_COMMITTED_ROWS_DISALLOW_UPDATES &&
TYPE != TableScanType::TABLE_SCAN_COMMITTED_ROWS_OMIT_PERMANENTLY_DELETED;
auto table_filters = state.GetFilters();

const auto &column_ids = state.GetColumnIds();
auto adaptive_filter = state.GetAdaptiveFilter();
while (true) {
if (state.vector_index * STANDARD_VECTOR_SIZE >= state.max_row_group_row) {
// exceeded the amount of rows to scan
// exceeded the amount of rows to scan
return;
}
idx_t current_row = state.vector_index * STANDARD_VECTOR_SIZE;
Expand Down Expand Up @@ -462,12 +464,46 @@ void RowGroup::TemplatedScan(TransactionData transaction, CollectionScanState &s
if (table_filters) {
D_ASSERT(adaptive_filter);
D_ASSERT(ALLOW_UPDATES);

// are we apply the whole OR conjunction filter at once. and then slice it.
// What we should do instead is, if the filter is a conjunction OR filter, apply every individual child
// update the selection vector with tuples that pass, then apply the next conjunction to the indexes that did not pass.
for (idx_t i = 0; i < table_filters->filters.size(); i++) {

auto tf_idx = adaptive_filter->permutation[i];
auto col_idx = column_ids[tf_idx];
auto &col_data = GetColumn(col_idx);
col_data.Select(transaction, state.vector_index, state.column_scans[tf_idx], result.data[tf_idx],
sel, approved_tuple_count, *table_filters->filters[tf_idx]);
auto &filter = table_filters->filters[tf_idx];

switch (filter->filter_type) {
case TableFilterType::CONJUNCTION_OR: {

auto &or_filter = filter->Cast<ConjunctionOrFilter>();
SelectionVector current_sel = sel;
SelectionVector *true_sel = nullptr;
SelectionVector *false_sel = nullptr;

// scan and flatten the vector.
// apply children one by one unti
for (idx_t i = 0; i < or_filter.child_filters.size(); i++) {
idx_t tcount = 0;
auto &child_filter_expression = or_filter.child_filters.at(i);
if (child_filter_expression->filter_type == TableFilterType::CONSTANT_COMPARISON) {
auto context = transaction.transaction.get()->context;
auto expression_executor = ExpressionExecutor(&context);
ColumnSegment::FilterSelection(sel, result.data.at(tf_idx), *child_filter_expression,
tcount, FlatVector::Validity(result.data.at(tf_idx)));
}
}


}
}
case TableFilterType::CONJUNCTION_AND:
default: {
col_data.Select(transaction, state.vector_index, state.column_scans[tf_idx], result.data[tf_idx],
sel, approved_tuple_count, *table_filters->filters[tf_idx]);
}
}
for (auto &table_filter : table_filters->filters) {
result.data[table_filter.first].Slice(sel, approved_tuple_count);
Expand Down
Loading