Skip to content

Commit

Permalink
[FLINK-35221][hive] Support SQL 2011 reserved keywords as identifiers…
Browse files Browse the repository at this point in the history
… in HiveParser (#18)
  • Loading branch information
WencongLiu authored May 24, 2024
1 parent 0421326 commit bb95109
Show file tree
Hide file tree
Showing 7 changed files with 263 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -607,8 +607,7 @@ private DataStreamSink<Row> createBatchNoCompactSink(
builder.setOverwrite(overwrite);
builder.setIsToLocal(isToLocal);
builder.setStaticPartitions(staticPartitionSpec);
builder.setTempPath(
new org.apache.flink.core.fs.Path(toStagingDir(stagingParentDir, jobConf)));
builder.setPath(new org.apache.flink.core.fs.Path(toStagingDir(stagingParentDir, jobConf)));

This comment has been minimized.

Copy link
@liucongjy

liucongjy Jun 27, 2024

1.18的flink版本没有这个builder.setPath()这个方法,编译要报错

builder.setOutputFileConfig(fileNaming);
builder.setIdentifier(identifier);
builder.setPartitionCommitPolicyFactory(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,4 +30,6 @@ public class HiveParserConstants {
/* Constants for insert overwrite directory */
public static final String IS_INSERT_DIRECTORY = "is-insert-directory";
public static final String IS_TO_LOCAL_DIRECTORY = "is-to-local-directory";
public static final String HIVE_SUPPORT_SQL_11_RESERVED_KEYWORDS =
"hive.support.sql11.reserved.keywords";
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,9 @@ k=3;
RecognitionException e) {
gParent.errors.add(new HiveASTParseError(gParent, e, tokenNames));
}
protected boolean useSQL11ReservedKeywordsForIdentifier() {
return gParent.useSQL11ReservedKeywordsForIdentifier();
}
}
@rulecatch {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -417,6 +417,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.flink.table.planner.delegation.hive.copy.HiveParserASTNode;
import org.apache.flink.table.planner.delegation.hive.copy.HiveASTParseError;
import static org.apache.flink.table.planner.delegation.hive.HiveParserConstants.HIVE_SUPPORT_SQL_11_RESERVED_KEYWORDS;
}
Expand Down Expand Up @@ -721,6 +722,12 @@ import org.apache.flink.table.planner.delegation.hive.copy.HiveASTParseError;
public void setHiveConf(Configuration hiveConf) {
this.hiveConf = hiveConf;
}
protected boolean useSQL11ReservedKeywordsForIdentifier() {
if(hiveConf==null){
return false;
}
return !hiveConf.getBoolean(HIVE_SUPPORT_SQL_11_RESERVED_KEYWORDS, true);
}
}

@rulecatch {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,9 @@ k=3;
RecognitionException e) {
gParent.errors.add(new HiveASTParseError(gParent, e, tokenNames));
}
protected boolean useSQL11ReservedKeywordsForIdentifier() {
return gParent.useSQL11ReservedKeywordsForIdentifier();
}
}
@rulecatch {
Expand Down Expand Up @@ -730,6 +733,8 @@ identifier
:
Identifier
| nonReserved -> Identifier[$nonReserved.start]
// The reserved keywords in SQL 2011 can be used as identifiers if useSQL11ReservedKeywordsForIdentifier() == true.
| {useSQL11ReservedKeywordsForIdentifier()}? sql11ReservedKeywordsUsedAsIdentifier -> Identifier[$sql11ReservedKeywordsUsedAsIdentifier.start]
;
functionIdentifier
Expand Down Expand Up @@ -806,3 +811,22 @@ sql11ReservedKeywordsUsedAsFunctionName
:
KW_IF | KW_ARRAY | KW_MAP | KW_BIGINT | KW_BINARY | KW_BOOLEAN | KW_CURRENT_DATE | KW_CURRENT_TIMESTAMP | KW_DATE | KW_DOUBLE | KW_FLOAT | KW_GROUPING | KW_INT | KW_SMALLINT | KW_TIMESTAMP
;
//The following SQL2011 reserved keywords can be used as identifiers if useSQL11ReservedKeywordsForIdentifier() == true.
sql11ReservedKeywordsUsedAsIdentifier
:
KW_ALL | KW_ALTER | KW_ARRAY | KW_AS | KW_AUTHORIZATION | KW_BETWEEN | KW_BIGINT | KW_BINARY | KW_BOOLEAN
| KW_BOTH | KW_BY | KW_CREATE | KW_CUBE | KW_CURRENT_DATE | KW_CURRENT_TIMESTAMP | KW_CURSOR | KW_DATE | KW_DECIMAL | KW_DELETE | KW_DESCRIBE
| KW_DOUBLE | KW_DROP | KW_EXISTS | KW_EXTERNAL | KW_FALSE | KW_FETCH | KW_FLOAT | KW_FOR | KW_FULL | KW_GRANT
| KW_GROUP | KW_GROUPING | KW_IMPORT | KW_IN | KW_INNER | KW_INSERT | KW_INT | KW_INTERSECT | KW_INTO | KW_IS | KW_LATERAL
| KW_LEFT | KW_LIKE | KW_LOCAL | KW_NONE | KW_NULL | KW_OF | KW_ORDER | KW_OUT | KW_OUTER | KW_PARTITION
| KW_PERCENT | KW_PROCEDURE | KW_RANGE | KW_READS | KW_REVOKE | KW_RIGHT
| KW_ROLLUP | KW_ROW | KW_ROWS | KW_SET | KW_SMALLINT | KW_TABLE | KW_TIMESTAMP | KW_TO | KW_TRIGGER | KW_TRUE
| KW_TRUNCATE | KW_UNION | KW_UPDATE | KW_USER | KW_USING | KW_VALUES | KW_WITH
| KW_REGEXP | KW_RLIKE
| KW_PRIMARY
| KW_FOREIGN
| KW_CONSTRAINT
| KW_REFERENCES
| KW_PRECISION
;
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,9 @@ k=3;
RecognitionException e) {
gParent.errors.add(new HiveASTParseError(gParent, e, tokenNames));
}
protected boolean useSQL11ReservedKeywordsForIdentifier() {
return gParent.useSQL11ReservedKeywordsForIdentifier();
}
}
@rulecatch {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,223 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.connectors.hive;

import org.apache.flink.table.api.SqlDialect;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.catalog.hive.HiveCatalog;
import org.apache.flink.table.catalog.hive.HiveTestUtils;
import org.apache.flink.table.module.CoreModule;
import org.apache.flink.table.module.hive.HiveModule;
import org.apache.flink.table.planner.delegation.hive.HiveParserConstants;
import org.apache.flink.util.CollectionUtil;

import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Random;

import static org.assertj.core.api.Assertions.assertThatNoException;

/** Test with SQL11 reserved keywords in hive queries. */
class HiveDialectSupportSQL11ReservedKeywordAsIdentifierITTest {
private static HiveCatalog hiveCatalog;
private static TableEnvironment tableEnv;
private static List<String> sql11ReservedKeywords;

@BeforeAll
static void setup() throws Exception {
hiveCatalog = HiveTestUtils.createHiveCatalog();
hiveCatalog
.getHiveConf()
.setBoolean(HiveParserConstants.HIVE_SUPPORT_SQL_11_RESERVED_KEYWORDS, false);
hiveCatalog.open();
tableEnv = getTableEnvWithHiveCatalog();
sql11ReservedKeywords =
Arrays.asList(
"ALL",
"ALTER",
"ARRAY",
"AS",
"AUTHORIZATION",
"BETWEEN",
"BIGINT",
"BINARY",
"BOOLEAN",
"BOTH",
"BY",
"CREATE",
"CUBE",
"CURRENT_DATE",
"CURRENT_TIMESTAMP",
"CURSOR",
"DATE",
"DECIMAL",
"DELETE",
"DESCRIBE",
"DOUBLE",
"DROP",
"EXISTS",
"EXTERNAL",
"FALSE",
"FETCH",
"FLOAT",
"FOR",
"FULL",
"GRANT",
"GROUP",
"GROUPING",
"IMPORT",
"IN",
"INNER",
"INSERT",
"INT",
"INTERSECT",
"INTO",
"IS",
"LATERAL",
"LEFT",
"LIKE",
"LOCAL",
"NONE",
"NULL",
"OF",
"ORDER",
"OUT",
"OUTER",
"PARTITION",
"PERCENT",
"PROCEDURE",
"RANGE",
"READS",
"REVOKE",
"RIGHT",
"ROLLUP",
"ROW",
"ROWS",
"SET",
"SMALLINT",
"TABLE",
"TIMESTAMP",
"TO",
"TRIGGER",
"TRUE",
"TRUNCATE",
"UNION",
"UPDATE",
"USER",
"USING",
"VALUES",
"WITH",
"REGEXP",
"RLIKE",
"PRIMARY",
"FOREIGN",
"CONSTRAINT",
"REFERENCES",
"PRECISION");
}

@Test
void testReservedKeywordAsIdentifierInDDL() {
List<String> toRun =
new ArrayList<>(
Arrays.asList(
"create table table1 (x int, %s int)",
"create table table2 (x int) partitioned by (%s string, q string)",
"create table table3 (\n"
+ " a int,\n"
+ " %s struct<f1: boolean, f2: string, f3: struct<f4: int, f5: double>, f6: int>\n"
+ ")"));
Random random = new Random();
for (String queryTemplate : toRun) {
// Select a random keyword.
String chosenKeyword =
sql11ReservedKeywords.get(random.nextInt(sql11ReservedKeywords.size()));
String finalQuery = String.format(queryTemplate, chosenKeyword);
runQuery(finalQuery);
}
}

@Test
void testReservedKeywordAsIdentifierInDQL() {
List<String> toRun =
new ArrayList<>(
Arrays.asList(
"create table table4(id int,name string,dep string,%s int,age int)",
"select avg(%s) over (partition by dep) as avgsal from table4",
"select dep,name,%s from (select dep,name,%s,rank() over "
+ "(partition by dep order by %s desc) as rnk from table4) a where rnk=1",
"select %s,sum(cnt) over (order by %s)/sum(cnt) over "
+ "(order by %s ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) from"
+ " (select %s,count(*) as cnt from table4 group by %s) a"));
Random random = new Random();
// Select a random keyword.
String chosenKeyword =
sql11ReservedKeywords.get(random.nextInt(sql11ReservedKeywords.size()));
for (String queryTemplate : toRun) {
String finalQuery = queryTemplate.replace("%s", chosenKeyword);
runQuery(finalQuery);
}
}

@Test
void testReservedKeywordAsIdentifierInDML() {
List<String> toRun =
new ArrayList<>(
Arrays.asList(
"create table table5 (%s string, value string)",
"create table table6(key int, ten int, one int, value string)",
"from table5 insert overwrite table table6 map table5.%s,"
+ " CAST(table5.%s / 10 AS INT), CAST(table5.%s % 10 AS INT),"
+ " table5.value using 'cat' as (tkey, ten, one, tvalue)"
+ " distribute by tvalue, tkey"));
Random random = new Random();
// Select a random keyword.
String chosenKeyword =
sql11ReservedKeywords.get(random.nextInt(sql11ReservedKeywords.size()));
for (String queryTemplate : toRun) {
String finalQuery = queryTemplate.replace("%s", chosenKeyword);
runQuery(finalQuery);
}
}

private void runQuery(String query) {
assertThatNoException()
.isThrownBy(
() -> CollectionUtil.iteratorToList(tableEnv.executeSql(query).collect()));
}

private static TableEnvironment getTableEnvWithHiveCatalog() {
TableEnvironment tableEnv = HiveTestUtils.createTableEnvInBatchMode(SqlDialect.HIVE);
tableEnv.registerCatalog(hiveCatalog.getName(), hiveCatalog);
tableEnv.useCatalog(hiveCatalog.getName());
// automatically load hive module in hive-compatible mode
HiveModule hiveModule = new HiveModule(hiveCatalog.getHiveVersion());
CoreModule coreModule = CoreModule.INSTANCE;
for (String loaded : tableEnv.listModules()) {
tableEnv.unloadModule(loaded);
}
tableEnv.loadModule("hive", hiveModule);
tableEnv.loadModule("core", coreModule);
return tableEnv;
}
}

0 comments on commit bb95109

Please sign in to comment.