Skip to content

Commit

Permalink
Support Unnest (#2408)
Browse files Browse the repository at this point in the history
### What problem does this PR solve?

1. Unnest syntax
2. Logical plan node
3. Physical plan node

### Type of change

- [x] New Feature (non-breaking change which adds functionality)

---------

Signed-off-by: Jin Hai <[email protected]>
  • Loading branch information
JinHai-CN authored Dec 26, 2024
1 parent 31fa6b1 commit ccc4998
Show file tree
Hide file tree
Showing 31 changed files with 3,516 additions and 2,953 deletions.
48 changes: 48 additions & 0 deletions src/executor/explain_physical_plan.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ import physical_merge_aggregate;
import status;
import physical_operator_type;
import physical_read_cache;
import physical_unnest;

import explain_logical_plan;
import logical_show;
Expand Down Expand Up @@ -317,6 +318,10 @@ void ExplainPhysicalPlan::Explain(const PhysicalOperator *op, SharedPtr<Vector<S
Explain(static_cast<const PhysicalReadCache *>(op), result, intent_size);
break;
}
case PhysicalOperatorType::kUnnest: {
Explain(static_cast<const PhysicalUnnest *>(op), result, intent_size);
break;
}
default: {
String error_message = "Unexpected physical operator type";
UnrecoverableError(error_message);
Expand Down Expand Up @@ -2769,4 +2774,47 @@ void ExplainPhysicalPlan::Explain(const PhysicalReadCache *read_cache_node, Shar
result->emplace_back(MakeShared<String>(output_columns));
}

void ExplainPhysicalPlan::Explain(const PhysicalUnnest *unnest_node, SharedPtr<Vector<SharedPtr<String>>> &result, i64 intent_size) {
String unnest_node_header_str;
if (intent_size != 0) {
unnest_node_header_str = String(intent_size - 2, ' ') + "-> Unnest ";
} else {
unnest_node_header_str = "\"-> Unnest \" ";
}

unnest_node_header_str += "(" + std::to_string(unnest_node->node_id()) + ")";
result->emplace_back(MakeShared<String>(unnest_node_header_str));

// Unnest expression
{
String unnest_expression_str = String(intent_size, ' ') + " - unnest expression: [";
Vector<SharedPtr<BaseExpression>> expression_list = unnest_node->expression_list();
SizeT expression_count = expression_list.size();
for (SizeT idx = 0; idx < expression_count - 1; ++idx) {
ExplainLogicalPlan::Explain(expression_list[idx].get(), unnest_expression_str);
unnest_expression_str += ", ";
}
ExplainLogicalPlan::Explain(expression_list.back().get(), unnest_expression_str);
unnest_expression_str += "]";
result->emplace_back(MakeShared<String>(unnest_expression_str));
}

// unnest expression
String filter_str = String(intent_size, ' ') + " - filter: ";

result->emplace_back(MakeShared<String>(filter_str));

// Output column
{
String output_columns_str = String(intent_size, ' ') + " - output columns: [";
SharedPtr<Vector<String>> output_columns = unnest_node->GetOutputNames();
SizeT column_count = output_columns->size();
for (SizeT idx = 0; idx < column_count - 1; ++idx) {
output_columns_str += output_columns->at(idx) + ", ";
}
output_columns_str += output_columns->back() + "]";
result->emplace_back(MakeShared<String>(output_columns_str));
}
}

} // namespace infinity
3 changes: 3 additions & 0 deletions src/executor/explain_physical_plan.cppm
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ import physical_fusion;
import physical_merge_aggregate;
import physical_match_sparse_scan;
import physical_read_cache;
import physical_unnest;

export module explain_physical_plan;

Expand Down Expand Up @@ -184,6 +185,8 @@ public:
static void Explain(const PhysicalMergeAggregate *fusion_node, SharedPtr<Vector<SharedPtr<String>>> &result, i64 intent_size = 0);

static void Explain(const PhysicalReadCache *read_cache_node, SharedPtr<Vector<SharedPtr<String>>> &result, i64 intent_size = 0);

static void Explain(const PhysicalUnnest *unnest_node, SharedPtr<Vector<SharedPtr<String>>> &result, i64 intent_size = 0);
};

} // namespace infinity
1 change: 1 addition & 0 deletions src/executor/fragment_builder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,7 @@ void FragmentBuilder::BuildFragments(PhysicalOperator *phys_op, PlanFragment *cu
}
case PhysicalOperatorType::kParallelAggregate:
case PhysicalOperatorType::kFilter:
case PhysicalOperatorType::kUnnest:
case PhysicalOperatorType::kHash:
case PhysicalOperatorType::kLimit: {
if (phys_op->left() == nullptr) {
Expand Down
48 changes: 48 additions & 0 deletions src/executor/operator/physical_unnest.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
// Copyright(C) 2024 InfiniFlow, Inc. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

module;

module physical_unnest;

import stl;
import txn;
import query_context;
import table_def;
import data_table;

import physical_operator_type;
import operator_state;
import expression_state;
import expression_selector;
import data_block;
import logger;
import third_party;

import infinity_exception;

namespace infinity {

void PhysicalUnnest::Init() {
}

bool PhysicalUnnest::Execute(QueryContext *, OperatorState *operator_state) {
OperatorState* prev_op_state = operator_state->prev_op_state_;
if (prev_op_state->Complete()) {
operator_state->SetComplete();
}
return true;
}

} // namespace infinity
62 changes: 62 additions & 0 deletions src/executor/operator/physical_unnest.cppm
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
// Copyright(C) 2024 InfiniFlow, Inc. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

module;

export module physical_unnest;

import stl;

import query_context;
import operator_state;
import physical_operator;
import physical_operator_type;
import base_expression;
import data_table;
import load_meta;
import infinity_exception;
import internal_types;
import data_type;

namespace infinity {

export class PhysicalUnnest : public PhysicalOperator {
public:
explicit PhysicalUnnest(u64 id,
UniquePtr<PhysicalOperator> left,
Vector<SharedPtr<BaseExpression>> expression_list,
SharedPtr<Vector<LoadMeta>> load_metas)
: PhysicalOperator(PhysicalOperatorType::kUnnest, std::move(left), nullptr, id, load_metas), expression_list_(std::move(expression_list)) {}

~PhysicalUnnest() override = default;

void Init() override;

bool Execute(QueryContext *query_context, OperatorState *operator_state) final;

inline SharedPtr<Vector<String>> GetOutputNames() const final { return PhysicalCommonFunctionUsingLoadMeta::GetOutputNames(*this); }

inline SharedPtr<Vector<SharedPtr<DataType>>> GetOutputTypes() const final { return PhysicalCommonFunctionUsingLoadMeta::GetOutputTypes(*this); }

SizeT TaskletCount() override { return left_->TaskletCount(); }

Vector<SharedPtr<BaseExpression>> expression_list() const { return expression_list_; }

private:
Vector<SharedPtr<BaseExpression>> expression_list_;

SharedPtr<DataTable> input_table_{};
};

} // namespace infinity
5 changes: 5 additions & 0 deletions src/executor/operator_state.cppm
Original file line number Diff line number Diff line change
Expand Up @@ -444,6 +444,11 @@ export struct CompactFinishOperatorState : public OperatorState {
SharedPtr<CompactStateData> compact_state_data_{};
};

// Unnest
export struct UnnestOperatorState : public OperatorState {
inline explicit UnnestOperatorState() : OperatorState(PhysicalOperatorType::kUnnest) {}
};

// Source
export enum class SourceStateType {
kInvalid,
Expand Down
2 changes: 2 additions & 0 deletions src/executor/physical_operator_type.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,8 @@ String PhysicalOperatorToString(PhysicalOperatorType type) {
return "CreateIndexFinish";
case PhysicalOperatorType::kReadCache:
return "ReadCache";
case PhysicalOperatorType::kUnnest:
return "Unnest";
}

Status status = Status::NotSupport("Unknown physical operator type");
Expand Down
1 change: 1 addition & 0 deletions src/executor/physical_operator_type.cppm
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ export enum class PhysicalOperatorType : i8 {

kTableScan,
kFilter,
kUnnest,
kIndexScan,
kDummyScan,
kKnnScan,
Expand Down
26 changes: 26 additions & 0 deletions src/executor/physical_planner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ import physical_create_index_prepare;
import physical_create_index_do;
import physical_create_index_finish;
import physical_read_cache;
import physical_unnest;

import logical_node;
import logical_node_type;
Expand Down Expand Up @@ -130,6 +131,7 @@ import logical_match_tensor_scan;
import logical_match_sparse_scan;
import logical_fusion;
import logical_read_cache;
import logical_unnest;

import value;
import value_expression;
Expand Down Expand Up @@ -339,6 +341,10 @@ UniquePtr<PhysicalOperator> PhysicalPlanner::BuildPhysicalOperator(const SharedP
result = BuildReadCache(logical_operator);
break;
}
case LogicalNodeType::kUnnest: {
result = BuildUnnest(logical_operator);
break;
}
default: {
String error_message = fmt::format("Unknown logical node type: {}", logical_operator->name());
UnrecoverableError(error_message);
Expand Down Expand Up @@ -1190,6 +1196,26 @@ UniquePtr<PhysicalOperator> PhysicalPlanner::BuildReadCache(const SharedPtr<Logi
logical_read_cache->is_min_heap_);
}

UniquePtr<PhysicalOperator> PhysicalPlanner::BuildUnnest(const SharedPtr<LogicalNode> &logical_operator) const {
auto input_logical_node = logical_operator->left_node();
if (input_logical_node.get() == nullptr) {
String error_message = "Logical filter node has no input node.";
UnrecoverableError(error_message);
}
if (logical_operator->right_node().get() != nullptr) {
String error_message = "Logical filter node shouldn't have right child.";
UnrecoverableError(error_message);
}

auto input_physical_operator = BuildPhysicalOperator(input_logical_node);

auto *logical_unnest = static_cast<LogicalUnnest *>(logical_operator.get());
return MakeUnique<PhysicalUnnest>(logical_unnest->node_id(),
std::move(input_physical_operator),
logical_unnest->expression_list(),
logical_unnest->load_metas());
}

UniquePtr<PhysicalOperator> PhysicalPlanner::BuildExplain(const SharedPtr<LogicalNode> &logical_operator) const {

auto input_logical_node = logical_operator->left_node();
Expand Down
2 changes: 2 additions & 0 deletions src/executor/physical_planner.cppm
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,8 @@ private:
// Read cache
[[nodiscard]] UniquePtr<PhysicalOperator> BuildReadCache(const SharedPtr<LogicalNode> &logical_operator) const;

[[nodiscard]] UniquePtr<PhysicalOperator> BuildUnnest(const SharedPtr<LogicalNode> &logical_operator) const;

// Explain
[[nodiscard]] UniquePtr<PhysicalOperator> BuildExplain(const SharedPtr<LogicalNode> &logical_operator) const;
};
Expand Down
Loading

0 comments on commit ccc4998

Please sign in to comment.