Skip to content

Commit

Permalink
Add other part of unnest
Browse files Browse the repository at this point in the history
Signed-off-by: Jin Hai <[email protected]>
  • Loading branch information
JinHai-CN committed Dec 25, 2024
1 parent 416cb5c commit ba245a6
Show file tree
Hide file tree
Showing 19 changed files with 356 additions and 4 deletions.
48 changes: 48 additions & 0 deletions src/executor/explain_physical_plan.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ import physical_merge_aggregate;
import status;
import physical_operator_type;
import physical_read_cache;
import physical_unnest;

import explain_logical_plan;
import logical_show;
Expand Down Expand Up @@ -317,6 +318,10 @@ void ExplainPhysicalPlan::Explain(const PhysicalOperator *op, SharedPtr<Vector<S
Explain(static_cast<const PhysicalReadCache *>(op), result, intent_size);
break;
}
case PhysicalOperatorType::kUnnest: {
Explain(static_cast<const PhysicalUnnest *>(op), result, intent_size);
break;
}
default: {
String error_message = "Unexpected physical operator type";
UnrecoverableError(error_message);
Expand Down Expand Up @@ -2769,4 +2774,47 @@ void ExplainPhysicalPlan::Explain(const PhysicalReadCache *read_cache_node, Shar
result->emplace_back(MakeShared<String>(output_columns));
}

void ExplainPhysicalPlan::Explain(const PhysicalUnnest *unnest_node, SharedPtr<Vector<SharedPtr<String>>> &result, i64 intent_size) {
String unnest_node_header_str;
if (intent_size != 0) {
unnest_node_header_str = String(intent_size - 2, ' ') + "-> Unnest ";
} else {
unnest_node_header_str = "\"-> Unnest \" ";
}

unnest_node_header_str += "(" + std::to_string(unnest_node->node_id()) + ")";
result->emplace_back(MakeShared<String>(unnest_node_header_str));

// Unnest expression
{
String unnest_expression_str = String(intent_size, ' ') + " - unnest expression: [";
Vector<SharedPtr<BaseExpression>> expression_list = unnest_node->expression_list();
SizeT expression_count = expression_list.size();
for (SizeT idx = 0; idx < expression_count - 1; ++idx) {
ExplainLogicalPlan::Explain(expression_list[idx].get(), unnest_expression_str);
unnest_expression_str += ", ";
}
ExplainLogicalPlan::Explain(expression_list.back().get(), unnest_expression_str);
unnest_expression_str += "]";
result->emplace_back(MakeShared<String>(unnest_expression_str));
}

// unnest expression
String filter_str = String(intent_size, ' ') + " - filter: ";

result->emplace_back(MakeShared<String>(filter_str));

// Output column
{
String output_columns_str = String(intent_size, ' ') + " - output columns: [";
SharedPtr<Vector<String>> output_columns = unnest_node->GetOutputNames();
SizeT column_count = output_columns->size();
for (SizeT idx = 0; idx < column_count - 1; ++idx) {
output_columns_str += output_columns->at(idx) + ", ";
}
output_columns_str += output_columns->back() + "]";
result->emplace_back(MakeShared<String>(output_columns_str));
}
}

} // namespace infinity
3 changes: 3 additions & 0 deletions src/executor/explain_physical_plan.cppm
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ import physical_fusion;
import physical_merge_aggregate;
import physical_match_sparse_scan;
import physical_read_cache;
import physical_unnest;

export module explain_physical_plan;

Expand Down Expand Up @@ -184,6 +185,8 @@ public:
static void Explain(const PhysicalMergeAggregate *fusion_node, SharedPtr<Vector<SharedPtr<String>>> &result, i64 intent_size = 0);

static void Explain(const PhysicalReadCache *read_cache_node, SharedPtr<Vector<SharedPtr<String>>> &result, i64 intent_size = 0);

static void Explain(const PhysicalUnnest *unnest_node, SharedPtr<Vector<SharedPtr<String>>> &result, i64 intent_size = 0);
};

} // namespace infinity
1 change: 1 addition & 0 deletions src/executor/fragment_builder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,7 @@ void FragmentBuilder::BuildFragments(PhysicalOperator *phys_op, PlanFragment *cu
}
case PhysicalOperatorType::kParallelAggregate:
case PhysicalOperatorType::kFilter:
case PhysicalOperatorType::kUnnest:
case PhysicalOperatorType::kHash:
case PhysicalOperatorType::kLimit: {
if (phys_op->left() == nullptr) {
Expand Down
48 changes: 48 additions & 0 deletions src/executor/operator/physical_unnest.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
// Copyright(C) 2024 InfiniFlow, Inc. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

module;

module physical_unnest;

import stl;
import txn;
import query_context;
import table_def;
import data_table;

import physical_operator_type;
import operator_state;
import expression_state;
import expression_selector;
import data_block;
import logger;
import third_party;

import infinity_exception;

namespace infinity {

void PhysicalUnnest::Init() {
}

bool PhysicalUnnest::Execute(QueryContext *, OperatorState *operator_state) {
OperatorState* prev_op_state = operator_state->prev_op_state_;
if (prev_op_state->Complete()) {
operator_state->SetComplete();
}
return true;
}

} // namespace infinity
62 changes: 62 additions & 0 deletions src/executor/operator/physical_unnest.cppm
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
// Copyright(C) 2024 InfiniFlow, Inc. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

module;

export module physical_unnest;

import stl;

import query_context;
import operator_state;
import physical_operator;
import physical_operator_type;
import base_expression;
import data_table;
import load_meta;
import infinity_exception;
import internal_types;
import data_type;

namespace infinity {

export class PhysicalUnnest : public PhysicalOperator {
public:
explicit PhysicalUnnest(u64 id,
UniquePtr<PhysicalOperator> left,
Vector<SharedPtr<BaseExpression>> expression_list,
SharedPtr<Vector<LoadMeta>> load_metas)
: PhysicalOperator(PhysicalOperatorType::kUnnest, std::move(left), nullptr, id, load_metas), expression_list_(std::move(expression_list)) {}

~PhysicalUnnest() override = default;

void Init() override;

bool Execute(QueryContext *query_context, OperatorState *operator_state) final;

inline SharedPtr<Vector<String>> GetOutputNames() const final { return PhysicalCommonFunctionUsingLoadMeta::GetOutputNames(*this); }

inline SharedPtr<Vector<SharedPtr<DataType>>> GetOutputTypes() const final { return PhysicalCommonFunctionUsingLoadMeta::GetOutputTypes(*this); }

SizeT TaskletCount() override { return left_->TaskletCount(); }

Vector<SharedPtr<BaseExpression>> expression_list() const { return expression_list_; }

private:
Vector<SharedPtr<BaseExpression>> expression_list_;

SharedPtr<DataTable> input_table_{};
};

} // namespace infinity
5 changes: 5 additions & 0 deletions src/executor/operator_state.cppm
Original file line number Diff line number Diff line change
Expand Up @@ -444,6 +444,11 @@ export struct CompactFinishOperatorState : public OperatorState {
SharedPtr<CompactStateData> compact_state_data_{};
};

// Unnest
export struct UnnestOperatorState : public OperatorState {
inline explicit UnnestOperatorState() : OperatorState(PhysicalOperatorType::kUnnest) {}
};

// Source
export enum class SourceStateType {
kInvalid,
Expand Down
2 changes: 2 additions & 0 deletions src/executor/physical_operator_type.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,8 @@ String PhysicalOperatorToString(PhysicalOperatorType type) {
return "CreateIndexFinish";
case PhysicalOperatorType::kReadCache:
return "ReadCache";
case PhysicalOperatorType::kUnnest:
return "Unnest";
}

Status status = Status::NotSupport("Unknown physical operator type");
Expand Down
1 change: 1 addition & 0 deletions src/executor/physical_operator_type.cppm
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ export enum class PhysicalOperatorType : i8 {

kTableScan,
kFilter,
kUnnest,
kIndexScan,
kDummyScan,
kKnnScan,
Expand Down
26 changes: 26 additions & 0 deletions src/executor/physical_planner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ import physical_create_index_prepare;
import physical_create_index_do;
import physical_create_index_finish;
import physical_read_cache;
import physical_unnest;

import logical_node;
import logical_node_type;
Expand Down Expand Up @@ -130,6 +131,7 @@ import logical_match_tensor_scan;
import logical_match_sparse_scan;
import logical_fusion;
import logical_read_cache;
import logical_unnest;

import value;
import value_expression;
Expand Down Expand Up @@ -339,6 +341,10 @@ UniquePtr<PhysicalOperator> PhysicalPlanner::BuildPhysicalOperator(const SharedP
result = BuildReadCache(logical_operator);
break;
}
case LogicalNodeType::kUnnest: {
result = BuildUnnest(logical_operator);
break;
}
default: {
String error_message = fmt::format("Unknown logical node type: {}", logical_operator->name());
UnrecoverableError(error_message);
Expand Down Expand Up @@ -1190,6 +1196,26 @@ UniquePtr<PhysicalOperator> PhysicalPlanner::BuildReadCache(const SharedPtr<Logi
logical_read_cache->is_min_heap_);
}

UniquePtr<PhysicalOperator> PhysicalPlanner::BuildUnnest(const SharedPtr<LogicalNode> &logical_operator) const {
auto input_logical_node = logical_operator->left_node();
if (input_logical_node.get() == nullptr) {
String error_message = "Logical filter node has no input node.";
UnrecoverableError(error_message);
}
if (logical_operator->right_node().get() != nullptr) {
String error_message = "Logical filter node shouldn't have right child.";
UnrecoverableError(error_message);
}

auto input_physical_operator = BuildPhysicalOperator(input_logical_node);

auto *logical_unnest = static_cast<LogicalUnnest *>(logical_operator.get());
return MakeUnique<PhysicalUnnest>(logical_unnest->node_id(),
std::move(input_physical_operator),
logical_unnest->expression_list(),
logical_unnest->load_metas());
}

UniquePtr<PhysicalOperator> PhysicalPlanner::BuildExplain(const SharedPtr<LogicalNode> &logical_operator) const {

auto input_logical_node = logical_operator->left_node();
Expand Down
2 changes: 2 additions & 0 deletions src/executor/physical_planner.cppm
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,8 @@ private:
// Read cache
[[nodiscard]] UniquePtr<PhysicalOperator> BuildReadCache(const SharedPtr<LogicalNode> &logical_operator) const;

[[nodiscard]] UniquePtr<PhysicalOperator> BuildUnnest(const SharedPtr<LogicalNode> &logical_operator) const;

// Explain
[[nodiscard]] UniquePtr<PhysicalOperator> BuildExplain(const SharedPtr<LogicalNode> &logical_operator) const;
};
Expand Down
28 changes: 26 additions & 2 deletions src/planner/bound_select_statement.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ import logical_import;
import logical_dummy_scan;
import logical_match;
import logical_fusion;
import logical_unnest;

import subquery_unnest;

Expand Down Expand Up @@ -101,6 +102,12 @@ SharedPtr<LogicalNode> BoundSelectStatement::BuildPlan(QueryContext *query_conte
root = filter;
}

if (!unnest_expressions_.empty()) {
SharedPtr<LogicalNode> unnest = BuildUnnest(root, unnest_expressions_, query_context, bind_context);
unnest->set_left_node(root);
root = unnest;
}

if (!group_by_expressions_.empty() || !aggregate_expressions_.empty()) {
// Build logical aggregate
auto base_table_ref = std::static_pointer_cast<BaseTableRef>(table_ref_ptr_);
Expand Down Expand Up @@ -143,7 +150,11 @@ SharedPtr<LogicalNode> BoundSelectStatement::BuildPlan(QueryContext *query_conte
root = top;
}
} else if (limit_expression_.get() != nullptr) {
auto limit = MakeShared<LogicalLimit>(bind_context->GetNewLogicalNodeId(), std::static_pointer_cast<BaseTableRef>(table_ref_ptr_), limit_expression_, offset_expression_, total_hits_count_flag_);
auto limit = MakeShared<LogicalLimit>(bind_context->GetNewLogicalNodeId(),
std::static_pointer_cast<BaseTableRef>(table_ref_ptr_),
limit_expression_,
offset_expression_,
total_hits_count_flag_);
limit->set_left_node(root);
root = limit;
}
Expand Down Expand Up @@ -389,7 +400,11 @@ SharedPtr<LogicalNode> BoundSelectStatement::BuildPlan(QueryContext *query_conte
}

if (limit_expression_.get() != nullptr) {
auto limit = MakeShared<LogicalLimit>(bind_context->GetNewLogicalNodeId(), base_table_ref, limit_expression_, offset_expression_, total_hits_count_flag_);
auto limit = MakeShared<LogicalLimit>(bind_context->GetNewLogicalNodeId(),
base_table_ref,
limit_expression_,
offset_expression_,
total_hits_count_flag_);
limit->set_left_node(root);
root = limit;
}
Expand Down Expand Up @@ -565,6 +580,15 @@ SharedPtr<LogicalNode> BoundSelectStatement::BuildFilter(SharedPtr<LogicalNode>
return filter;
}

SharedPtr<LogicalNode> BoundSelectStatement::BuildUnnest(SharedPtr<LogicalNode> &root,
Vector<SharedPtr<BaseExpression>> &expressions,
QueryContext *query_context,
const SharedPtr<BindContext> &bind_context) {
// SharedPtr<LogicalUnnest> unnest
auto unnest = MakeShared<LogicalUnnest>(bind_context->GetNewLogicalNodeId(), expressions);
return unnest;
}

void BoundSelectStatement::BuildSubquery(SharedPtr<LogicalNode> &root,
SharedPtr<BaseExpression> &condition,
QueryContext *query_context,
Expand Down
5 changes: 5 additions & 0 deletions src/planner/bound_select_statement.cppm
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,11 @@ public:
QueryContext *query_context,
const SharedPtr<BindContext> &bind_context);

SharedPtr<LogicalNode> BuildUnnest(SharedPtr<LogicalNode> &root,
Vector<SharedPtr<BaseExpression>> &expressions,
QueryContext *query_context,
const SharedPtr<BindContext> &bind_context);

void BuildSubquery(SharedPtr<LogicalNode> &root,
SharedPtr<BaseExpression> &condition,
QueryContext *query_context,
Expand Down
8 changes: 6 additions & 2 deletions src/planner/explain_logical_plan.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -52,11 +52,11 @@ import logical_match;
import logical_match_tensor_scan;
import logical_match_sparse_scan;
import logical_fusion;
import base_expression;

import logical_unnest;
import logical_node_type;
import third_party;

import base_expression;
import expression_type;
import knn_expression;
import aggregate_expression;
Expand Down Expand Up @@ -111,6 +111,10 @@ Status ExplainLogicalPlan::Explain(const LogicalNode *statement, SharedPtr<Vecto
Explain((LogicalFilter *)statement, result, intent_size);
break;
}
case LogicalNodeType::kUnnest: {
Explain((LogicalUnnest *)statement, result, intent_size);
break;
}
case LogicalNodeType::kProjection: {
Explain((LogicalProject *)statement, result, intent_size);
break;
Expand Down
Loading

0 comments on commit ba245a6

Please sign in to comment.