Skip to content

Commit ca0a458

Browse files
committed
[KQP RBO] Aggregation add support empty keys
1 parent 58ec6db commit ca0a458

File tree

2 files changed

+28
-2
lines changed

2 files changed

+28
-2
lines changed

ydb/core/kqp/opt/rbo/kqp_rbo_rules.cpp

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -115,6 +115,23 @@ TExprNode::TPtr PruneCast(TExprNode::TPtr node) {
115115
return node;
116116
}
117117

118+
TVector<TInfoUnit> GetHashableKeys(const std::shared_ptr<IOperator> &input) {
119+
if (!input->Type) {
120+
return input->GetOutputIUs();
121+
}
122+
123+
const auto *inputType = input->Type;
124+
TVector<TInfoUnit> hashableKeys;
125+
const auto* structType = inputType->Cast<TListExprType>()->GetItemType()->Cast<TStructExprType>();
126+
for (const auto &item : structType->GetItems()) {
127+
if (item->GetItemType()->IsHashable()) {
128+
hashableKeys.push_back(TInfoUnit(TString(item->GetName())));
129+
}
130+
}
131+
132+
return hashableKeys;
133+
}
134+
118135
bool IsNullRejectingPredicate(const TFilterInfo &filter, TExprContext &ctx) {
119136
Y_UNUSED(ctx);
120137
#ifdef DEBUG_PREDICATE
@@ -576,8 +593,9 @@ bool TAssignStagesRule::TestAndApply(std::shared_ptr<IOperator> &input, TRBOCont
576593
const auto newStageId = props.StageGraph.AddStage();
577594
aggregate->Props.StageId = newStageId;
578595
const bool isInputSourceStage = props.StageGraph.IsSourceStage(inputStageId);
596+
const auto shuffleKeys = aggregate->KeyColumns.size() ? aggregate->KeyColumns : GetHashableKeys(aggregate->GetInput());
579597

580-
props.StageGraph.Connect(inputStageId, newStageId, std::make_shared<TShuffleConnection>(aggregate->KeyColumns, isInputSourceStage));
598+
props.StageGraph.Connect(inputStageId, newStageId, std::make_shared<TShuffleConnection>(shuffleKeys, isInputSourceStage));
581599
YQL_CLOG(TRACE, CoreDq) << "Assign stage to Aggregation ";
582600
} else {
583601
Y_ENSURE(false, "Unknown operator encountered");

ydb/core/kqp/ut/rbo/kqp_rbo_ut.cpp

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -423,11 +423,19 @@ Y_UNIT_TEST_SUITE(KqpRbo) {
423423
SET TablePathPrefix = "/Root/";
424424
select t1.b, sum(t1.c) from t1 inner join t2 on t1.a = t2.a group by t1.b order by t1.b;
425425
)",
426+
R"(
427+
--!syntax_pg
428+
SET TablePathPrefix = "/Root/";
429+
select sum(t1.c) from t1 group by t1.b
430+
union all
431+
select sum(t1.b) from t1;
432+
)",
426433
};
427434

428435
std::vector<std::string> results = {
429436
R"([["1";"4"];["2";"6"]])",
430-
R"([["1";"4"];["2";"6"]])"
437+
R"([["1";"4"];["2";"6"]])",
438+
R"([["6"];["4"];["8"]])"
431439
};
432440

433441
for (ui32 i = 0; i < queries.size(); ++i) {

0 commit comments

Comments
 (0)