Skip to content

Commit 06c61b8

Browse files
committed
rebase and address comments
1 parent 9db6463 commit 06c61b8

File tree

3 files changed

+28
-8
lines changed

3 files changed

+28
-8
lines changed

flink-table/flink-table-planner/src/main/java/org/apache/calcite/sql/validate/SqlValidatorImpl.java

Lines changed: 16 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
*/
1717
package org.apache.calcite.sql.validate;
1818

19+
import org.apache.flink.table.api.ValidationException;
1920
import org.apache.flink.table.planner.calcite.FlinkSqlCallBinding;
2021
import org.apache.flink.table.planner.functions.sql.ml.SqlVectorSearchTableFunction;
2122

@@ -178,10 +179,12 @@
178179
*
179180
* <p>Lines 2571 ~ 2588, CALCITE-7217, should be removed after upgrading Calcite to 1.41.0.
180181
*
181-
* <p>Lines 3840 ~ 3844, 6511 ~ 6517 Flink improves Optimize the retrieval of sub-operands in
182+
* <p>Line 2618 ~2631, set the correct scope for VECTOR_SEARCH.
183+
*
184+
* <p>Lines 3920 ~ 3925, 6599 ~ 6606 Flink improves Optimize the retrieval of sub-operands in
182185
* SqlCall when using NamedParameters at {@link SqlValidatorImpl#checkRollUp}.
183186
*
184-
* <p>Lines 5315 ~ 5321, FLINK-24352 Add null check for temporal table check on SqlSnapshot.
187+
* <p>Lines 5340 ~ 5347, FLINK-24352 Add null check for temporal table check on SqlSnapshot.
185188
*/
186189
public class SqlValidatorImpl implements SqlValidatorWithHints {
187190
// ~ Static fields/initializers ---------------------------------------------
@@ -2571,6 +2574,10 @@ private SqlNode registerFrom(
25712574
case LATERAL:
25722575
// ----- FLINK MODIFICATION BEGIN -----
25732576
SqlBasicCall sbc = (SqlBasicCall) node;
2577+
// Put the usingScope which is a JoinScope,
2578+
// in order to make visible the left items
2579+
// of the JOIN tree.
2580+
scopes.put(node, usingScope);
25742581
registerFrom(
25752582
parentScope,
25762583
usingScope,
@@ -2581,10 +2588,6 @@ private SqlNode registerFrom(
25812588
extendList,
25822589
forceNullable,
25832590
true);
2584-
// Put the usingScope which is a JoinScope,
2585-
// in order to make visible the left items
2586-
// of the JOIN tree.
2587-
scopes.put(node, usingScope);
25882591
return sbc;
25892592
// ----- FLINK MODIFICATION END -----
25902593

@@ -2625,8 +2628,14 @@ private SqlNode registerFrom(
26252628
.isA(
26262629
new HashSet<>(
26272630
Collections.singletonList(SqlKind.SELECT)))) {
2631+
boolean queryColumnIsNotLiteral =
2632+
binding.operand(2).getKind() != SqlKind.LITERAL;
2633+
if (!queryColumnIsNotLiteral && !lateral) {
2634+
throw new ValidationException(
2635+
"The query column is not literal, please use LATERAL TABLE to run VECTOR_SEARCH.");
2636+
}
26282637
SqlValidatorScope scope = getSelectScope((SqlSelect) binding.operand(0));
2629-
scopes.put(node, scope);
2638+
scopes.put(enclosingNode, scope);
26302639
return newNode;
26312640
}
26322641
// ----- FLINK MODIFICATION END -----

flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/sql/FlinkSqlOperatorTable.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@
2222
import org.apache.flink.table.planner.calcite.FlinkTypeFactory;
2323
import org.apache.flink.table.planner.functions.sql.internal.SqlAuxiliaryGroupAggFunction;
2424
import org.apache.flink.table.planner.functions.sql.ml.SqlMLEvaluateTableFunction;
25-
import org.apache.flink.table.planner.functions.sql.ml.SqlMLPredictTableFunction;
2625
import org.apache.flink.table.planner.functions.sql.ml.SqlVectorSearchTableFunction;
2726
import org.apache.flink.table.planner.plan.type.FlinkReturnTypes;
2827
import org.apache.flink.table.planner.plan.type.NumericExceptFirstOperandChecker;

flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/stream/sql/VectorSearchTableFunctionTest.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,18 @@ void testLiteralValue() {
8989
+ "+- FlinkLogicalTableSourceScan(table=[[default_catalog, default_database, VectorTable]], fields=[e, f, g])"));
9090
}
9191

92+
@Test
93+
void testLiteralValueWithoutLateralKeyword() {
94+
String sql =
95+
"SELECT * FROM TABLE(VECTOR_SEARCH(TABLE VectorTable, DESCRIPTOR(`g`), ARRAY[1.5, 2.0], 10))";
96+
assertThatThrownBy(() -> util.verifyRelPlan(sql))
97+
.satisfies(
98+
FlinkAssertions.anyCauseMatches(
99+
TableException.class,
100+
"FlinkLogicalTableFunctionScan(invocation=[VECTOR_SEARCH(TABLE(#0), DESCRIPTOR(_UTF-16LE'g'), ARRAY(1.5:DECIMAL(2, 1), 2.0:DECIMAL(2, 1)), 10)], rowType=[RecordType(INTEGER e, BIGINT f, FLOAT ARRAY g, DOUBLE score)])\n"
101+
+ "+- FlinkLogicalTableSourceScan(table=[[default_catalog, default_database, VectorTable]], fields=[e, f, g])"));
102+
}
103+
92104
@Test
93105
void testNamedArgument() {
94106
String sql =

0 commit comments

Comments
 (0)