Skip to content

Commit

Permalink
Create translation function library
Browse files Browse the repository at this point in the history
Signed-off-by: Daniel Henneberger <[email protected]>
  • Loading branch information
henneberger committed Nov 10, 2024
1 parent fda065d commit 59b725b
Show file tree
Hide file tree
Showing 17 changed files with 177 additions and 41 deletions.
6 changes: 6 additions & 0 deletions sqrl-calcite/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,12 @@
<groupId>com.datasqrl</groupId>
<artifactId>sqrl-jdbc-1.19</artifactId>
</dependency>
<dependency>
<groupId>com.datasqrl</groupId>
<artifactId>sqrl-translate-common</artifactId>
<version>0.5.9-SNAPSHOT</version>
<scope>compile</scope>
</dependency>
</dependencies>
<build>
<pluginManagement>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import org.apache.calcite.rex.RexCall;
import org.apache.calcite.rex.RexNode;
import org.apache.calcite.rex.RexShuttle;
import org.apache.calcite.sql.SqlDialect;
import org.apache.calcite.sql.SqlOperator;
import org.apache.calcite.tools.Programs;

Expand All @@ -31,7 +32,7 @@ public DialectCallConverter(RelOptPlanner planner) {
this.planner = planner;
}

public RelNode convert(Dialect dialect, RelNode relNode) {
public RelNode convert(SqlDialect dialect, RelNode relNode) {
Map<SqlOperator, RuleTransform> transforms = extractFunctionTransforms(relNode);

List<RelRule> rules = new ArrayList<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,13 @@
import com.datasqrl.calcite.convert.SqlNodeToString;
import com.datasqrl.calcite.convert.SqlNodeToString.SqlStrings;
import com.datasqrl.calcite.convert.SqlToStringFactory;
import com.datasqrl.calcite.dialect.ExtendedPostgresSqlDialect;
import com.datasqrl.calcite.dialect.ExtendedSnowflakeSqlDialect;
import com.datasqrl.calcite.schema.ExpandTableMacroRule.ExpandTableMacroConfig;
import com.datasqrl.calcite.schema.sql.SqlBuilders.SqlSelectBuilder;
import com.datasqrl.canonicalizer.ReservedName;
import com.datasqrl.engine.stream.flink.sql.calcite.FlinkDialect;
import com.datasqrl.jdbc.SqrlPostgresDialect;
import com.datasqrl.parse.SqrlParserImpl;
import com.datasqrl.util.DataContextImpl;
import java.util.Arrays;
Expand Down Expand Up @@ -270,7 +274,22 @@ public RelNode runStage(OptimizationStage stage, RelNode relNode) {

public RelNode convertRelToDialect(Dialect dialect, RelNode relNode) {
return new DialectCallConverter(planner)
.convert(dialect, relNode);
.convert(dialectToSqlDialect(dialect), relNode);
}

//temporary until we remove the Dialect enum
private SqlDialect dialectToSqlDialect(Dialect dialect) {
switch (dialect){
case SQRL:
case CALCITE:
case POSTGRES:
return ExtendedPostgresSqlDialect.DEFAULT;
case FLINK:
return FlinkDialect.DEFAULT;
case SNOWFLAKE:
return ExtendedSnowflakeSqlDialect.DEFAULT;
}
throw new RuntimeException("Unknown dialect:" + dialect);
}

public static SqlNodes relToSql(Dialect dialect, RelNode relNode) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
import org.apache.calcite.avatica.util.Casing;
import org.apache.calcite.rex.RexCall;
import org.apache.calcite.sql.SqlDialect;
import org.apache.calcite.sql.validate.SqlConformance;
import org.apache.flink.sql.parser.validate.FlinkSqlConformance;

public class FlinkDialect extends SqlDialect {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,17 +1,15 @@
package com.datasqrl.function.translations;

import com.datasqrl.calcite.Dialect;
import lombok.AllArgsConstructor;
import lombok.Getter;
import org.apache.calcite.sql.SqlDialect;
import org.apache.calcite.sql.SqlOperator;

@Getter
@AllArgsConstructor
public abstract class DialectSqlTranslation implements SqlTranslation {

private final Dialect dialect;
private final SqlDialect sqlDialect;
private final SqlOperator operator;

public DialectSqlTranslation(Dialect dialect, SqlOperator operator) {
this.dialect = dialect;
this.operator = operator;
}
}
Original file line number Diff line number Diff line change
@@ -1,14 +1,13 @@
package com.datasqrl.function.translations;

import com.datasqrl.calcite.Dialect;
import lombok.AllArgsConstructor;
import lombok.Getter;
import org.apache.calcite.sql.SqlOperator;
import org.apache.calcite.sql.dialect.PostgresqlSqlDialect;

@Getter
public abstract class PostgresSqlTranslation extends DialectSqlTranslation {

public PostgresSqlTranslation(SqlOperator operator) {
super(Dialect.POSTGRES, operator);
super(PostgresqlSqlDialect.DEFAULT, operator);
}
}
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
package com.datasqrl.function.translations;

import com.datasqrl.calcite.Dialect;
import com.datasqrl.calcite.dialect.ExtendedSnowflakeSqlDialect;
import lombok.Getter;
import org.apache.calcite.sql.SqlOperator;

@Getter
public abstract class SnowflakeSqlTranslation extends DialectSqlTranslation {
public SnowflakeSqlTranslation(SqlOperator operator) {
super(Dialect.SNOWFLAKE, operator);
super(ExtendedSnowflakeSqlDialect.DEFAULT, operator);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import com.datasqrl.calcite.convert.SimpleCallTransform.SimpleCallTransformConfig;
import com.datasqrl.calcite.convert.SimplePredicateTransform;
import com.datasqrl.calcite.convert.SimplePredicateTransform.SimplePredicateTransformConfig;
import com.datasqrl.calcite.dialect.ExtendedPostgresSqlDialect;
import com.datasqrl.calcite.function.RuleTransform;
import com.datasqrl.canonicalizer.Name;
import com.datasqrl.function.PgSpecificOperatorTable;
Expand All @@ -18,16 +19,18 @@
import org.apache.calcite.rex.RexCall;
import org.apache.calcite.rex.RexLiteral;
import org.apache.calcite.rex.RexNode;
import org.apache.calcite.sql.SqlDialect;
import org.apache.calcite.sql.SqlKind;
import org.apache.calcite.sql.SqlOperator;
import org.apache.calcite.sql.dialect.PostgresqlSqlDialect;
import org.apache.calcite.sql.fun.SqlStdOperatorTable;

@AutoService(RuleTransform.class)
public class TextSearchTranslation implements RuleTransform {

@Override
public List<RelRule> transform(Dialect dialect, SqlOperator operator) {
if (dialect != Dialect.POSTGRES) {
public List<RelRule> transform(SqlDialect dialect, SqlOperator operator) {
if (!dialect.getClass().isAssignableFrom(ExtendedPostgresSqlDialect.class)) {
return List.of();
}
return List.of(
Expand Down
7 changes: 7 additions & 0 deletions sqrl-flink-lib/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
<module>sqrl-flexible-csv</module>
<module>sqrl-name</module>
<module>sqrl-errors</module>
<module>sqrl-translate-common</module>
</modules>

<description>Parent pom</description>
Expand Down Expand Up @@ -107,6 +108,12 @@
<version>${project.version}</version>
</dependency>

<dependency>
<groupId>com.datasqrl</groupId>
<artifactId>sqrl-jdbc-1.19</artifactId>
<version>${project.version}</version>
</dependency>

<dependency>
<groupId>com.datasqrl</groupId>
<artifactId>sqrl-json</artifactId>
Expand Down
41 changes: 41 additions & 0 deletions sqrl-flink-lib/sqrl-translate-common/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>com.datasqrl</groupId>
<artifactId>sqrl-flink-lib</artifactId>
<version>0.5.9-SNAPSHOT</version>
</parent>

<artifactId>sqrl-translate-common</artifactId>

<properties>
<maven.compiler.source>11</maven.compiler.source>
<maven.compiler.target>11</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>

<dependencies>
<dependency>
<groupId>org.apache.calcite</groupId>
<artifactId>calcite-core</artifactId>
<version>1.32.0</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner_2.12</artifactId>
<version>1.19.1</version>
<scope>provided</scope>
</dependency>

<dependency>
<groupId>com.datasqrl</groupId>
<artifactId>sqrl-jdbc-1.19</artifactId>
<version>${project.version}</version>
<scope>provided</scope>
</dependency>
</dependencies>
</project>
Original file line number Diff line number Diff line change
@@ -1,12 +1,9 @@
package com.datasqrl.calcite.dialect;


import static org.apache.calcite.sql.SqlKind.COLLECTION_TABLE;

import com.datasqrl.calcite.Dialect;
import com.datasqrl.function.translations.SqlTranslation;
import com.datasqrl.function.util.ServiceLoaderDiscovery;
import com.datasqrl.type.JdbcTypeSerializer;
import com.datasqrl.util.ServiceLoaderDiscovery;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.calcite.avatica.util.Casing;
Expand All @@ -16,19 +13,18 @@
import org.apache.calcite.sql.SqlCall;
import org.apache.calcite.sql.SqlDataTypeSpec;
import org.apache.calcite.sql.SqlDialect;
import org.apache.calcite.sql.SqlKind;
import org.apache.calcite.sql.SqlWriter;
import org.apache.calcite.sql.dialect.PostgresqlSqlDialect;
import org.apache.calcite.sql.fun.SqlCollectionTableOperator;
import org.apache.calcite.sql.parser.SqlParserPos;
import org.apache.calcite.sql.type.SqlTypeName;
import org.apache.calcite.sql.validate.SqlConformance;
import org.apache.calcite.sql.validate.SqlConformanceEnum;
import org.apache.flink.table.planner.plan.schema.RawRelDataType;

public class ExtendedPostgresSqlDialect extends PostgresqlSqlDialect {

public static final Map<String, SqlTranslation> translationMap = ServiceLoaderDiscovery.getAll(SqlTranslation.class)
.stream().filter(f->f.getDialect() == Dialect.POSTGRES)
.stream().filter(f->f.getSqlDialect().getClass().isAssignableFrom(ExtendedPostgresSqlDialect.class))
.collect(Collectors.toMap(f->f.getOperator().getName().toLowerCase(), f->f));

public static final ExtendedPostgresSqlDialect.Context DEFAULT_CONTEXT;
Expand All @@ -37,7 +33,7 @@ public class ExtendedPostgresSqlDialect extends PostgresqlSqlDialect {
static {
DEFAULT_CONTEXT = SqlDialect.EMPTY_CONTEXT.withDatabaseProduct(DatabaseProduct.POSTGRESQL)
.withIdentifierQuoteString("\"").withUnquotedCasing(Casing.TO_LOWER)
.withDataTypeSystem(POSTGRESQL_TYPE_SYSTEM)
.withDataTypeSystem(PostgresqlSqlDialect.POSTGRESQL_TYPE_SYSTEM)
.withConformance(new PostgresConformance());
DEFAULT = new ExtendedPostgresSqlDialect(DEFAULT_CONTEXT);
}
Expand All @@ -51,7 +47,7 @@ public ExtendedPostgresSqlDialect(Context context) {
private static Map<Class, String> getForeignCastSpecs() {
Map<Class, String> jdbcTypeSerializer = ServiceLoaderDiscovery.getAll(JdbcTypeSerializer.class)
.stream()
.filter(f->f.getDialectId().equalsIgnoreCase(Dialect.POSTGRES.name()))
.filter(f->f.getDialectId().equalsIgnoreCase("POSTGRES"))
.collect(Collectors.toMap(JdbcTypeSerializer::getConversionClass,
JdbcTypeSerializer::dialectTypeName));
return jdbcTypeSerializer;
Expand Down Expand Up @@ -133,7 +129,7 @@ public SqlDataTypeSpec getCastSpec(RelDataType type) {

@Override
public void unparseCall(SqlWriter writer, SqlCall call, int leftPrec, int rightPrec) {
if (call.getOperator().getKind() == COLLECTION_TABLE) { //skip FROM TABLE(..) call
if (call.getOperator().getKind() == SqlKind.COLLECTION_TABLE) { //skip FROM TABLE(..) call
unparseCall(writer, (SqlCall)call.getOperandList().get(0), leftPrec, rightPrec);
return;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
package com.datasqrl.calcite.dialect;

import com.datasqrl.calcite.Dialect;
import com.datasqrl.function.translations.SqlTranslation;
import com.datasqrl.util.ServiceLoaderDiscovery;
import com.datasqrl.function.util.ServiceLoaderDiscovery;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.calcite.avatica.util.Casing;
Expand All @@ -14,7 +13,7 @@
public class ExtendedSnowflakeSqlDialect extends SnowflakeSqlDialect {
public static final Map<String, SqlTranslation> translationMap = ServiceLoaderDiscovery
.getAll(SqlTranslation.class)
.stream().filter(f->f.getDialect() == Dialect.SNOWFLAKE)
.stream().filter(f->f.getSqlDialect().getClass().isAssignableFrom(ExtendedSnowflakeSqlDialect.class))
.collect(Collectors.toMap(f->f.getOperator().getName().toLowerCase(), f->f));

public static final SqlDialect.Context DEFAULT_CONTEXT;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package com.datasqrl.calcite.function;

import com.datasqrl.calcite.Dialect;
import org.apache.calcite.plan.RelRule;
import org.apache.calcite.sql.SqlDialect;
import org.apache.calcite.sql.SqlOperator;

import java.util.List;
Expand All @@ -12,7 +12,7 @@ public interface RuleTransform {
* may not be the same operator your 'this' since it may undergo delegation so it is passed
* as a parameter.
*/
List<RelRule> transform(Dialect dialect, SqlOperator operator /* todo engine capabilities*/);
List<RelRule> transform(SqlDialect dialect, SqlOperator operator /* todo engine capabilities*/);

String getRuleOperatorName();
}
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
package com.datasqrl.function.translations;

import com.datasqrl.calcite.Dialect;
import org.apache.calcite.sql.SqlCall;
import org.apache.calcite.sql.SqlDialect;
import org.apache.calcite.sql.SqlOperator;
import org.apache.calcite.sql.SqlWriter;

public interface SqlTranslation {
Dialect getDialect();
SqlDialect getSqlDialect();
SqlOperator getOperator();
void unparse(SqlCall call, SqlWriter writer, int leftPrec, int rightPrec);
}
Loading

0 comments on commit 59b725b

Please sign in to comment.