diff --git a/.github/workflows/main.yml b/.github/workflows/main.yml index 4c8a54841cd..f9752ddfcec 100644 --- a/.github/workflows/main.yml +++ b/.github/workflows/main.yml @@ -69,7 +69,7 @@ jobs: with: job-id: jdk${{ matrix.jdk }} remote-build-cache-proxy-enabled: false - arguments: --scan --no-parallel --no-daemon build javadoc + arguments: --scan --no-parallel --no-daemon build javadoc --exclude-task :arrow:build - name: 'sqlline and sqllsh' shell: cmd run: | @@ -103,7 +103,7 @@ jobs: with: job-id: jdk${{ matrix.jdk }} remote-build-cache-proxy-enabled: false - arguments: --scan --no-parallel --no-daemon build + arguments: --scan --no-parallel --no-daemon build --exclude-task :arrow:build - name: 'sqlline and sqllsh' shell: cmd run: | @@ -215,6 +215,7 @@ jobs: S3_BUILD_CACHE_ACCESS_KEY_ID: ${{ secrets.S3_BUILD_CACHE_ACCESS_KEY_ID }} S3_BUILD_CACHE_SECRET_KEY: ${{ secrets.S3_BUILD_CACHE_SECRET_KEY }} GRADLE_ENTERPRISE_ACCESS_KEY: ${{ secrets.GE_ACCESS_TOKEN }} + _JAVA_OPTIONS: ${{ env._JAVA_OPTIONS }} --add-opens=java.base/java.nio=ALL-UNNAMED with: job-id: jdk${{ matrix.jdk }} remote-build-cache-proxy-enabled: false @@ -241,6 +242,7 @@ jobs: S3_BUILD_CACHE_ACCESS_KEY_ID: ${{ secrets.S3_BUILD_CACHE_ACCESS_KEY_ID }} S3_BUILD_CACHE_SECRET_KEY: ${{ secrets.S3_BUILD_CACHE_SECRET_KEY }} GRADLE_ENTERPRISE_ACCESS_KEY: ${{ secrets.GE_ACCESS_TOKEN }} + _JAVA_OPTIONS: ${{ env._JAVA_OPTIONS }} --add-opens=java.base/java.nio=ALL-UNNAMED with: job-id: jdk${{ matrix.jdk }} remote-build-cache-proxy-enabled: false @@ -310,6 +312,7 @@ jobs: S3_BUILD_CACHE_ACCESS_KEY_ID: ${{ secrets.S3_BUILD_CACHE_ACCESS_KEY_ID }} S3_BUILD_CACHE_SECRET_KEY: ${{ secrets.S3_BUILD_CACHE_SECRET_KEY }} GRADLE_ENTERPRISE_ACCESS_KEY: ${{ secrets.GE_ACCESS_TOKEN }} + _JAVA_OPTIONS: ${{ env._JAVA_OPTIONS }} --add-opens=java.base/java.nio=ALL-UNNAMED with: job-id: jdk19 remote-build-cache-proxy-enabled: false diff --git a/Jenkinsfile b/Jenkinsfile index 53cc1e6f31f..f83fff47459 100644 --- a/Jenkinsfile +++ b/Jenkinsfile @@ -41,7 +41,7 @@ node('ubuntu') { } stage('Code Quality') { timeout(time: 1, unit: 'HOURS') { - withEnv(["Path+JDK=$JAVA_JDK_17/bin","JAVA_HOME=$JAVA_JDK_17"]) { + withEnv(["Path+JDK=$JAVA_JDK_17/bin","JAVA_HOME=$JAVA_JDK_17","_JAVA_OPTIONS=--add-opens=java.base/java.nio=ALL-UNNAMED"]) { withCredentials([string(credentialsId: 'SONARCLOUD_TOKEN', variable: 'SONAR_TOKEN')]) { if ( env.BRANCH_NAME.startsWith("PR-") ) { sh './gradlew --no-parallel --no-daemon jacocoAggregateTestReport sonar -PenableJacoco -Dsonar.pullrequest.branch=${CHANGE_BRANCH} -Dsonar.pullrequest.base=${CHANGE_TARGET} -Dsonar.pullrequest.key=${CHANGE_ID} -Dsonar.login=${SONAR_TOKEN}' diff --git a/arrow/build.gradle.kts b/arrow/build.gradle.kts new file mode 100644 index 00000000000..0ed73e8ddf3 --- /dev/null +++ b/arrow/build.gradle.kts @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +dependencies { + api(project(":core")) + + implementation("com.google.guava:guava") + implementation("org.apache.arrow:arrow-memory-netty") + implementation("org.apache.arrow:arrow-vector") + implementation("org.apache.arrow.gandiva:arrow-gandiva") + annotationProcessor("org.immutables:value") + compileOnly("org.immutables:value-annotations") + + testImplementation("org.apache.arrow:arrow-jdbc") + testImplementation("net.hydromatic:scott-data-hsqldb") + testImplementation("org.apache.commons:commons-lang3") + testImplementation(project(":core")) + testImplementation(project(":testkit")) +} diff --git a/arrow/gradle.properties b/arrow/gradle.properties new file mode 100644 index 00000000000..4d2cfdd304a --- /dev/null +++ b/arrow/gradle.properties @@ -0,0 +1,18 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to you under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +description=Arrow adapter for Calcite +artifact.name=Calcite Arrow diff --git a/arrow/src/main/java/org/apache/calcite/adapter/arrow/ArrowEnumerable.java b/arrow/src/main/java/org/apache/calcite/adapter/arrow/ArrowEnumerable.java new file mode 100644 index 00000000000..d054ba8a4d8 --- /dev/null +++ b/arrow/src/main/java/org/apache/calcite/adapter/arrow/ArrowEnumerable.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.calcite.adapter.arrow; + +import org.apache.calcite.linq4j.AbstractEnumerable; +import org.apache.calcite.linq4j.Enumerator; +import org.apache.calcite.util.ImmutableIntList; +import org.apache.calcite.util.Util; + +import org.apache.arrow.gandiva.evaluator.Filter; +import org.apache.arrow.gandiva.evaluator.Projector; +import org.apache.arrow.vector.ipc.ArrowFileReader; + +import org.checkerframework.checker.nullness.qual.Nullable; + +/** + * Enumerable that reads from Arrow value-vectors. + */ +class ArrowEnumerable extends AbstractEnumerable { + private final ArrowFileReader arrowFileReader; + private final @Nullable Projector projector; + private final @Nullable Filter filter; + private final ImmutableIntList fields; + + ArrowEnumerable(ArrowFileReader arrowFileReader, + @Nullable Projector projector, @Nullable Filter filter, + ImmutableIntList fields) { + this.arrowFileReader = arrowFileReader; + this.projector = projector; + this.filter = filter; + this.fields = fields; + } + + @Override public Enumerator enumerator() { + try { + return new ArrowEnumerator(projector, filter, fields, arrowFileReader); + } catch (Exception e) { + throw Util.toUnchecked(e); + } + } +} diff --git a/arrow/src/main/java/org/apache/calcite/adapter/arrow/ArrowEnumerator.java b/arrow/src/main/java/org/apache/calcite/adapter/arrow/ArrowEnumerator.java new file mode 100644 index 00000000000..d8f4670e1bb --- /dev/null +++ b/arrow/src/main/java/org/apache/calcite/adapter/arrow/ArrowEnumerator.java @@ -0,0 +1,184 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.calcite.adapter.arrow; + +import org.apache.calcite.linq4j.Enumerator; +import org.apache.calcite.util.ImmutableIntList; +import org.apache.calcite.util.Util; + +import org.apache.arrow.gandiva.evaluator.Filter; +import org.apache.arrow.gandiva.evaluator.Projector; +import org.apache.arrow.gandiva.evaluator.SelectionVector; +import org.apache.arrow.gandiva.evaluator.SelectionVectorInt16; +import org.apache.arrow.gandiva.exceptions.GandivaException; +import org.apache.arrow.memory.ArrowBuf; +import org.apache.arrow.memory.BufferAllocator; +import org.apache.arrow.memory.RootAllocator; +import org.apache.arrow.vector.ValueVector; +import org.apache.arrow.vector.VectorSchemaRoot; +import org.apache.arrow.vector.VectorUnloader; +import org.apache.arrow.vector.ipc.ArrowFileReader; +import org.apache.arrow.vector.ipc.message.ArrowRecordBatch; + +import org.checkerframework.checker.nullness.qual.Nullable; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +/** + * Enumerator that reads from a collection of Arrow value-vectors. + */ +class ArrowEnumerator implements Enumerator { + private final BufferAllocator allocator; + private final ArrowFileReader arrowFileReader; + private final @Nullable Projector projector; + private final @Nullable Filter filter; + private int rowIndex = -1; + private final List valueVectors; + private final List fields; + private int rowSize; + private @Nullable ArrowBuf buf; + private @Nullable SelectionVector selectionVector; + private int selectionVectorIndex = 0; + + ArrowEnumerator(@Nullable Projector projector, @Nullable Filter filter, + ImmutableIntList fields, ArrowFileReader arrowFileReader) { + this.allocator = new RootAllocator(Long.MAX_VALUE); + this.projector = projector; + this.filter = filter; + this.arrowFileReader = arrowFileReader; + this.fields = fields; + this.valueVectors = new ArrayList<>(fields.size()); + + // Set up fields so that first call to moveNext() will trigger a call to + // loadNextBatch(). + if (projector != null) { + rowIndex = rowSize = 0; + } else { + selectionVector = null; + selectionVectorIndex = 0; + } + } + + @Override public Object current() { + if (fields.size() == 1) { + return this.valueVectors.get(0).getObject(rowIndex); + } + Object[] current = new Object[valueVectors.size()]; + for (int i = 0; i < valueVectors.size(); i++) { + ValueVector vector = this.valueVectors.get(i); + current[i] = vector.getObject(rowIndex); + } + return current; + } + + @Override public boolean moveNext() { + if (projector != null) { + if (rowIndex >= rowSize - 1) { + final boolean hasNextBatch; + try { + hasNextBatch = arrowFileReader.loadNextBatch(); + } catch (IOException e) { + throw Util.toUnchecked(e); + } + if (hasNextBatch) { + rowIndex = 0; + this.valueVectors.clear(); + loadNextArrowBatch(); + return true; + } else { + return false; + } + } else { + rowIndex++; + return true; + } + } else { + if (selectionVector == null + || selectionVectorIndex >= selectionVector.getRecordCount()) { + boolean hasNextBatch; + while (true) { + try { + hasNextBatch = arrowFileReader.loadNextBatch(); + } catch (IOException e) { + throw Util.toUnchecked(e); + } + if (hasNextBatch) { + selectionVectorIndex = 0; + this.valueVectors.clear(); + loadNextArrowBatch(); + assert selectionVector != null; + if (selectionVectorIndex >= selectionVector.getRecordCount()) { + // the "filtered" batch is empty, but there may be more batches to fetch + continue; + } + rowIndex = selectionVector.getIndex(selectionVectorIndex++); + return true; + } else { + return false; + } + } + } else { + rowIndex = selectionVector.getIndex(selectionVectorIndex++); + return true; + } + } + } + + private void loadNextArrowBatch() { + try { + final VectorSchemaRoot vsr = arrowFileReader.getVectorSchemaRoot(); + for (int i : fields) { + this.valueVectors.add(vsr.getVector(i)); + } + this.rowSize = vsr.getRowCount(); + VectorUnloader vectorUnloader = new VectorUnloader(vsr); + ArrowRecordBatch arrowRecordBatch = vectorUnloader.getRecordBatch(); + if (projector != null) { + projector.evaluate(arrowRecordBatch, valueVectors); + } + if (filter != null) { + this.buf = this.allocator.buffer(rowSize * 2); + this.selectionVector = new SelectionVectorInt16(buf); + filter.evaluate(arrowRecordBatch, selectionVector); + } + } catch (IOException | GandivaException e) { + throw Util.toUnchecked(e); + } + } + + @Override public void reset() { + throw new UnsupportedOperationException(); + } + + @Override public void close() { + try { + if (projector != null) { + projector.close(); + } + if (filter != null) { + if (buf != null) { + buf.close(); + } + filter.close(); + } + } catch (GandivaException e) { + throw Util.toUnchecked(e); + } + } +} diff --git a/arrow/src/main/java/org/apache/calcite/adapter/arrow/ArrowFieldType.java b/arrow/src/main/java/org/apache/calcite/adapter/arrow/ArrowFieldType.java new file mode 100644 index 00000000000..e6cb6d1823f --- /dev/null +++ b/arrow/src/main/java/org/apache/calcite/adapter/arrow/ArrowFieldType.java @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.calcite.adapter.arrow; + +import org.apache.calcite.adapter.java.JavaTypeFactory; +import org.apache.calcite.linq4j.tree.Primitive; +import org.apache.calcite.rel.type.RelDataType; + +import org.apache.arrow.vector.types.FloatingPointPrecision; +import org.apache.arrow.vector.types.pojo.ArrowType; + +import java.math.BigDecimal; +import java.util.Date; +import java.util.List; + +import static java.util.Objects.requireNonNull; + +/** + * Arrow field type. + */ +enum ArrowFieldType { + INT(Primitive.INT), + BOOLEAN(Primitive.BOOLEAN), + STRING(String.class), + FLOAT(Primitive.FLOAT), + DOUBLE(Primitive.DOUBLE), + DATE(Date.class), + LIST(List.class), + DECIMAL(BigDecimal.class), + LONG(Primitive.LONG), + BYTE(Primitive.BYTE), + SHORT(Primitive.SHORT); + + private final Class clazz; + + ArrowFieldType(Primitive primitive) { + this(requireNonNull(primitive.boxClass, "boxClass")); + } + + ArrowFieldType(Class clazz) { + this.clazz = clazz; + } + + public RelDataType toType(JavaTypeFactory typeFactory) { + RelDataType javaType = typeFactory.createJavaType(clazz); + RelDataType sqlType = typeFactory.createSqlType(javaType.getSqlTypeName()); + return typeFactory.createTypeWithNullability(sqlType, true); + } + + public static ArrowFieldType of(ArrowType arrowType) { + switch (arrowType.getTypeID()) { + case Int: + int bitWidth = ((ArrowType.Int) arrowType).getBitWidth(); + switch (bitWidth) { + case 64: + return LONG; + case 32: + return INT; + case 16: + return SHORT; + case 8: + return BYTE; + default: + throw new RuntimeException("Unsupported Int bit width: " + bitWidth); + } + case Bool: + return BOOLEAN; + case Utf8: + return STRING; + case FloatingPoint: + FloatingPointPrecision precision = ((ArrowType.FloatingPoint) arrowType).getPrecision(); + switch (precision) { + case SINGLE: + return FLOAT; + case DOUBLE: + return DOUBLE; + default: + throw new RuntimeException("Unsupported Floating point precision: " + precision); + } + case Date: + return DATE; + case Decimal: + return DECIMAL; + default: + throw new RuntimeException("Unsupported type: " + arrowType); + } + } +} diff --git a/arrow/src/main/java/org/apache/calcite/adapter/arrow/ArrowFilter.java b/arrow/src/main/java/org/apache/calcite/adapter/arrow/ArrowFilter.java new file mode 100644 index 00000000000..6ca169bfb17 --- /dev/null +++ b/arrow/src/main/java/org/apache/calcite/adapter/arrow/ArrowFilter.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.calcite.adapter.arrow; + +import org.apache.calcite.plan.RelOptCluster; +import org.apache.calcite.plan.RelOptCost; +import org.apache.calcite.plan.RelOptPlanner; +import org.apache.calcite.plan.RelTraitSet; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.core.Filter; +import org.apache.calcite.rel.metadata.RelMetadataQuery; +import org.apache.calcite.rex.RexNode; + +import java.util.List; + +import static java.util.Objects.requireNonNull; + +/** + * Implementation of a {@link org.apache.calcite.rel.core.Filter} + * relational expression in Arrow. + */ +class ArrowFilter extends Filter implements ArrowRel { + private final List match; + + ArrowFilter(RelOptCluster cluster, + RelTraitSet traitSet, + RelNode input, + RexNode condition) { + super(cluster, traitSet, input, condition); + + final ArrowTranslator translator = + ArrowTranslator.create(cluster.getRexBuilder(), input.getRowType()); + this.match = translator.translateMatch(condition); + } + + @Override public RelOptCost computeSelfCost(RelOptPlanner planner, + RelMetadataQuery mq) { + final RelOptCost cost = super.computeSelfCost(planner, mq); + return requireNonNull(cost, "cost").multiplyBy(0.1); + } + + @Override public ArrowFilter copy(RelTraitSet traitSet, RelNode input, + RexNode condition) { + return new ArrowFilter(getCluster(), traitSet, input, condition); + } + + @Override public void implement(Implementor implementor) { + implementor.visitInput(0, getInput()); + implementor.add(null, match); + } +} diff --git a/arrow/src/main/java/org/apache/calcite/adapter/arrow/ArrowMethod.java b/arrow/src/main/java/org/apache/calcite/adapter/arrow/ArrowMethod.java new file mode 100644 index 00000000000..9ee0882c323 --- /dev/null +++ b/arrow/src/main/java/org/apache/calcite/adapter/arrow/ArrowMethod.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.calcite.adapter.arrow; + +import org.apache.calcite.DataContext; +import org.apache.calcite.linq4j.tree.Types; +import org.apache.calcite.util.ImmutableIntList; + +import com.google.common.collect.ImmutableMap; + +import java.lang.reflect.Method; +import java.util.List; + +/** + * Built-in methods in the Arrow adapter. + * + * @see org.apache.calcite.util.BuiltInMethod + */ +@SuppressWarnings("ImmutableEnumChecker") +enum ArrowMethod { + ARROW_QUERY(ArrowTable.class, "query", DataContext.class, + ImmutableIntList.class, List.class); + + final Method method; + + static final ImmutableMap MAP; + + static { + final ImmutableMap.Builder builder = + ImmutableMap.builder(); + for (ArrowMethod value : ArrowMethod.values()) { + builder.put(value.method, value); + } + MAP = builder.build(); + } + + /** Defines a method. */ + ArrowMethod(Class clazz, String methodName, Class... argumentTypes) { + this.method = Types.lookupMethod(clazz, methodName, argumentTypes); + } +} diff --git a/arrow/src/main/java/org/apache/calcite/adapter/arrow/ArrowProject.java b/arrow/src/main/java/org/apache/calcite/adapter/arrow/ArrowProject.java new file mode 100644 index 00000000000..770f12244e5 --- /dev/null +++ b/arrow/src/main/java/org/apache/calcite/adapter/arrow/ArrowProject.java @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.calcite.adapter.arrow; + +import org.apache.calcite.plan.RelOptCluster; +import org.apache.calcite.plan.RelOptCost; +import org.apache.calcite.plan.RelOptPlanner; +import org.apache.calcite.plan.RelTraitSet; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.core.Project; +import org.apache.calcite.rel.metadata.RelMetadataQuery; +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.rex.RexInputRef; +import org.apache.calcite.rex.RexNode; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableSet; + +import org.checkerframework.checker.nullness.qual.Nullable; + +import java.util.ArrayList; +import java.util.List; + +import static java.util.Objects.requireNonNull; + +/** + * Implementation of {@link org.apache.calcite.rel.core.Project} + * relational expression in Arrow. + */ +class ArrowProject extends Project implements ArrowRel { + + /** Creates an ArrowProject. */ + ArrowProject(RelOptCluster cluster, RelTraitSet traitSet, + RelNode input, List projects, RelDataType rowType) { + super(cluster, traitSet, ImmutableList.of(), input, projects, rowType, ImmutableSet.of()); + assert getConvention() == ArrowRel.CONVENTION; + assert getConvention() == input.getConvention(); + } + + @Override public Project copy(RelTraitSet traitSet, RelNode input, + List projects, RelDataType rowType) { + return new ArrowProject(getCluster(), traitSet, input, projects, + rowType); + } + + @Override public RelOptCost computeSelfCost(RelOptPlanner planner, + RelMetadataQuery mq) { + final RelOptCost cost = super.computeSelfCost(planner, mq); + return requireNonNull(cost, "cost").multiplyBy(0.1); + } + + @Override public void implement(Implementor implementor) { + implementor.visitInput(0, getInput()); + implementor.add(getProjectFields(getProjects()), null); + } + + static @Nullable List getProjectFields(List exps) { + final List fields = new ArrayList<>(); + for (final RexNode exp : exps) { + if (exp instanceof RexInputRef) { + fields.add(((RexInputRef) exp).getIndex()); + } else { + return null; // not a simple projection + } + } + return fields; + } +} diff --git a/arrow/src/main/java/org/apache/calcite/adapter/arrow/ArrowRel.java b/arrow/src/main/java/org/apache/calcite/adapter/arrow/ArrowRel.java new file mode 100644 index 00000000000..1576acf4d54 --- /dev/null +++ b/arrow/src/main/java/org/apache/calcite/adapter/arrow/ArrowRel.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.calcite.adapter.arrow; + +import org.apache.calcite.plan.Convention; +import org.apache.calcite.plan.RelOptTable; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.util.ImmutableIntList; + +import org.checkerframework.checker.nullness.qual.Nullable; + +import java.util.ArrayList; +import java.util.List; + +/** + * Relational expression that uses Arrow calling convention. + */ +public interface ArrowRel extends RelNode { + void implement(Implementor implementor); + + /** Calling convention for relational operations that occur in Arrow. */ + Convention CONVENTION = new Convention.Impl("ARROW", ArrowRel.class); + + /** Callback for the implementation process that converts a tree of + * {@link ArrowRel} nodes into a SQL query. */ + class Implementor { + @Nullable List selectFields; + final List whereClause = new ArrayList<>(); + @Nullable RelOptTable table; + @Nullable ArrowTable arrowTable; + + /** Adds newly projected fields and restricted predicates. + * + * @param fields New fields to be projected from a query + * @param predicates Predicates + */ + void add(@Nullable List fields, @Nullable List predicates) { + if (fields != null) { + selectFields = ImmutableIntList.copyOf(fields); + } + if (predicates != null) { + whereClause.addAll(predicates); + } + } + + public void visitInput(int ordinal, RelNode input) { + assert ordinal == 0; + ((ArrowRel) input).implement(this); + } + } +} diff --git a/arrow/src/main/java/org/apache/calcite/adapter/arrow/ArrowRules.java b/arrow/src/main/java/org/apache/calcite/adapter/arrow/ArrowRules.java new file mode 100644 index 00000000000..ff3762d59fa --- /dev/null +++ b/arrow/src/main/java/org/apache/calcite/adapter/arrow/ArrowRules.java @@ -0,0 +1,175 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.calcite.adapter.arrow; + +import org.apache.calcite.adapter.enumerable.EnumerableConvention; +import org.apache.calcite.plan.Convention; +import org.apache.calcite.plan.RelOptRule; +import org.apache.calcite.plan.RelOptRuleCall; +import org.apache.calcite.plan.RelRule; +import org.apache.calcite.plan.RelTraitSet; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.convert.ConverterRule; +import org.apache.calcite.rel.core.Filter; +import org.apache.calcite.rel.core.Project; +import org.apache.calcite.rel.logical.LogicalFilter; +import org.apache.calcite.rel.logical.LogicalProject; +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.sql.validate.SqlValidatorUtil; + +import com.google.common.collect.ImmutableList; + +import org.checkerframework.checker.nullness.qual.Nullable; +import org.immutables.value.Value; + +import java.util.List; + +/** Planner rules relating to the Arrow adapter. */ +public class ArrowRules { + private ArrowRules() {} + + /** Rule that matches a {@link org.apache.calcite.rel.core.Project} on + * an {@link ArrowTableScan} and pushes down projects if possible. */ + public static final ArrowProjectRule PROJECT_SCAN = + ArrowProjectRule.DEFAULT_CONFIG.toRule(ArrowProjectRule.class); + + public static final ArrowFilterRule FILTER_SCAN = + ArrowFilterRule.Config.DEFAULT.toRule(); + + public static final ArrowToEnumerableConverterRule TO_ENUMERABLE = + ArrowToEnumerableConverterRule.DEFAULT_CONFIG + .toRule(ArrowToEnumerableConverterRule.class); + + public static final List RULES = + ImmutableList.of(PROJECT_SCAN, + FILTER_SCAN); + + static List arrowFieldNames(final RelDataType rowType) { + return SqlValidatorUtil.uniquify(rowType.getFieldNames(), + SqlValidatorUtil.EXPR_SUGGESTER, true); + } + + /** Base class for planner rules that convert a relational expression to + * Arrow calling convention. */ + abstract static class ArrowConverterRule extends ConverterRule { + ArrowConverterRule(Config config) { + super(config); + } + } + + /** + * Rule to convert a {@link org.apache.calcite.rel.core.Filter} to an + * {@link ArrowFilter}. + */ + public static class ArrowFilterRule extends RelRule { + + /** Creates an ArrowFilterRule. */ + protected ArrowFilterRule(Config config) { + super(config); + } + + @Override public void onMatch(RelOptRuleCall call) { + final Filter filter = call.rel(0); + + if (filter.getTraitSet().contains(Convention.NONE)) { + final RelNode converted = convert(filter); + call.transformTo(converted); + } + } + + RelNode convert(Filter filter) { + final RelTraitSet traitSet = + filter.getTraitSet().replace(ArrowRel.CONVENTION); + return new ArrowFilter(filter.getCluster(), traitSet, + convert(filter.getInput(), ArrowRel.CONVENTION), + filter.getCondition()); + } + + /** Rule configuration. */ + @Value.Immutable + public interface Config extends RelRule.Config { + Config DEFAULT = ImmutableConfig.builder() + .withOperandSupplier(b0 -> + b0.operand(LogicalFilter.class).oneInput(b1 -> + b1.operand(ArrowTableScan.class).noInputs())) + .build(); + + @Override default ArrowFilterRule toRule() { + return new ArrowFilterRule(this); + } + } + } + + /** + * Planner rule that projects from an {@link ArrowTableScan} just the columns + * needed to satisfy a projection. If the projection's expressions are + * trivial, the projection is removed. + * + * @see ArrowRules#PROJECT_SCAN + */ + public static class ArrowProjectRule extends ArrowConverterRule { + + /** Default configuration. */ + protected static final Config DEFAULT_CONFIG = Config.INSTANCE + .withConversion(LogicalProject.class, Convention.NONE, + ArrowRel.CONVENTION, "ArrowProjectRule") + .withRuleFactory(ArrowProjectRule::new); + + /** Creates an ArrowProjectRule. */ + protected ArrowProjectRule(Config config) { + super(config); + } + + @Override public @Nullable RelNode convert(RelNode rel) { + final Project project = (Project) rel; + @Nullable List fields = + ArrowProject.getProjectFields(project.getProjects()); + if (fields == null) { + // Project contains expressions more complex than just field references. + return null; + } + final RelTraitSet traitSet = + project.getTraitSet().replace(ArrowRel.CONVENTION); + return new ArrowProject(project.getCluster(), traitSet, + convert(project.getInput(), ArrowRel.CONVENTION), + project.getProjects(), project.getRowType()); + } + } + + /** + * Rule to convert a relational expression from + * {@link ArrowRel#CONVENTION} to {@link EnumerableConvention}. + */ + static class ArrowToEnumerableConverterRule extends ConverterRule { + + /** Default configuration. */ + public static final Config DEFAULT_CONFIG = Config.INSTANCE + .withConversion(RelNode.class, ArrowRel.CONVENTION, + EnumerableConvention.INSTANCE, "ArrowToEnumerableConverterRule") + .withRuleFactory(ArrowToEnumerableConverterRule::new); + + /** Creates an ArrowToEnumerableConverterRule. */ + protected ArrowToEnumerableConverterRule(Config config) { + super(config); + } + + @Override public RelNode convert(RelNode rel) { + RelTraitSet newTraitSet = rel.getTraitSet().replace(getOutConvention()); + return new ArrowToEnumerableConverter(rel.getCluster(), newTraitSet, rel); + } + } +} diff --git a/arrow/src/main/java/org/apache/calcite/adapter/arrow/ArrowSchema.java b/arrow/src/main/java/org/apache/calcite/adapter/arrow/ArrowSchema.java new file mode 100644 index 00000000000..3ba6a4e97fe --- /dev/null +++ b/arrow/src/main/java/org/apache/calcite/adapter/arrow/ArrowSchema.java @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.calcite.adapter.arrow; + +import org.apache.calcite.schema.Table; +import org.apache.calcite.schema.impl.AbstractSchema; +import org.apache.calcite.util.Sources; +import org.apache.calcite.util.Util; + +import org.apache.arrow.memory.RootAllocator; +import org.apache.arrow.vector.ipc.ArrowFileReader; +import org.apache.arrow.vector.ipc.SeekableReadChannel; + +import com.google.common.base.Suppliers; +import com.google.common.collect.ImmutableMap; + +import org.checkerframework.checker.nullness.qual.Nullable; + +import java.io.File; +import java.io.FileInputStream; +import java.io.FileNotFoundException; +import java.util.HashMap; +import java.util.Locale; +import java.util.Map; +import java.util.function.Supplier; + +import static java.util.Objects.requireNonNull; + +/** + * Schema mapped onto a set of Arrow files. + */ +class ArrowSchema extends AbstractSchema { + private final Supplier> tableMapSupplier; + + /** + * Creates an Arrow schema. + * + * @param baseDirectory Base directory to look for relative files + */ + ArrowSchema(File baseDirectory) { + requireNonNull(baseDirectory, "baseDirectory"); + this.tableMapSupplier = + Suppliers.memoize(() -> deduceTableMap(baseDirectory)); + } + + /** + * Looks for a suffix on a string and returns + * either the string with the suffix removed + * or the original string. + */ + private static String trim(String s, String suffix) { + String trimmed = trimOrNull(s, suffix); + return trimmed != null ? trimmed : s; + } + + /** + * Looks for a suffix on a string and returns + * either the string with the suffix removed + * or null. + */ + private static @Nullable String trimOrNull(String s, String suffix) { + return s.endsWith(suffix) + ? s.substring(0, s.length() - suffix.length()) + : null; + } + + @Override protected Map getTableMap() { + return tableMapSupplier.get(); + } + + private static Map deduceTableMap(File baseDirectory) { + File[] files = baseDirectory.listFiles((dir, name) -> name.endsWith(".arrow")); + if (files == null) { + System.out.println("directory " + baseDirectory + " not found"); + return ImmutableMap.of(); + } + + final Map tables = new HashMap<>(); + for (File file : files) { + final File arrowFile = new File(Sources.of(file).path()); + final FileInputStream fileInputStream; + try { + fileInputStream = new FileInputStream(arrowFile); + } catch (FileNotFoundException e) { + throw Util.toUnchecked(e); + } + final SeekableReadChannel seekableReadChannel = + new SeekableReadChannel(fileInputStream.getChannel()); + final RootAllocator allocator = new RootAllocator(); + final ArrowFileReader arrowFileReader = + new ArrowFileReader(seekableReadChannel, allocator); + final String tableName = + trim(file.getName(), ".arrow").toUpperCase(Locale.ROOT); + final ArrowTable table = + new ArrowTable(null, arrowFileReader); + tables.put(tableName, table); + } + + return ImmutableMap.copyOf(tables); + } +} diff --git a/arrow/src/main/java/org/apache/calcite/adapter/arrow/ArrowSchemaFactory.java b/arrow/src/main/java/org/apache/calcite/adapter/arrow/ArrowSchemaFactory.java new file mode 100644 index 00000000000..b0be6688fd6 --- /dev/null +++ b/arrow/src/main/java/org/apache/calcite/adapter/arrow/ArrowSchemaFactory.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.calcite.adapter.arrow; + +import org.apache.calcite.model.ModelHandler; +import org.apache.calcite.schema.Schema; +import org.apache.calcite.schema.SchemaFactory; +import org.apache.calcite.schema.SchemaPlus; + +import java.io.File; +import java.util.Map; + +/** + * Factory that creates an {@link ArrowSchema}. + */ +public class ArrowSchemaFactory implements SchemaFactory { + + @Override public Schema create(SchemaPlus parentSchema, String name, + Map operand) { + File baseDirectory = + (File) operand.get(ModelHandler.ExtraOperand.BASE_DIRECTORY.camelName); + final String directory = (String) operand.get("directory"); + File directoryFile = null; + if (directory != null) { + directoryFile = new File(directory); + } + if (baseDirectory != null) { + if (directoryFile == null) { + directoryFile = baseDirectory; + } else if (!directoryFile.isAbsolute()) { + directoryFile = new File(baseDirectory, directoryFile.getPath()); + } + } + if (directoryFile == null) { + throw new RuntimeException("no directory"); + } + return new ArrowSchema(directoryFile); + } +} diff --git a/arrow/src/main/java/org/apache/calcite/adapter/arrow/ArrowTable.java b/arrow/src/main/java/org/apache/calcite/adapter/arrow/ArrowTable.java new file mode 100644 index 00000000000..f0931153fed --- /dev/null +++ b/arrow/src/main/java/org/apache/calcite/adapter/arrow/ArrowTable.java @@ -0,0 +1,196 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.calcite.adapter.arrow; + +import org.apache.calcite.DataContext; +import org.apache.calcite.adapter.java.JavaTypeFactory; +import org.apache.calcite.linq4j.Enumerable; +import org.apache.calcite.linq4j.QueryProvider; +import org.apache.calcite.linq4j.Queryable; +import org.apache.calcite.linq4j.tree.Expression; +import org.apache.calcite.plan.RelOptCluster; +import org.apache.calcite.plan.RelOptTable; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.rel.type.RelDataTypeFactory; +import org.apache.calcite.rel.type.RelProtoDataType; +import org.apache.calcite.schema.QueryableTable; +import org.apache.calcite.schema.SchemaPlus; +import org.apache.calcite.schema.Schemas; +import org.apache.calcite.schema.TranslatableTable; +import org.apache.calcite.schema.impl.AbstractTable; +import org.apache.calcite.util.ImmutableIntList; +import org.apache.calcite.util.Util; + +import org.apache.arrow.gandiva.evaluator.Filter; +import org.apache.arrow.gandiva.evaluator.Projector; +import org.apache.arrow.gandiva.exceptions.GandivaException; +import org.apache.arrow.gandiva.expression.Condition; +import org.apache.arrow.gandiva.expression.ExpressionTree; +import org.apache.arrow.gandiva.expression.TreeBuilder; +import org.apache.arrow.gandiva.expression.TreeNode; +import org.apache.arrow.vector.ipc.ArrowFileReader; +import org.apache.arrow.vector.types.pojo.ArrowType; +import org.apache.arrow.vector.types.pojo.Field; +import org.apache.arrow.vector.types.pojo.Schema; + +import org.checkerframework.checker.nullness.qual.Nullable; + +import java.io.IOException; +import java.lang.reflect.Type; +import java.util.ArrayList; +import java.util.List; + +import static java.util.Objects.requireNonNull; + +/** + * Arrow Table. + */ +public class ArrowTable extends AbstractTable + implements TranslatableTable, QueryableTable { + private final @Nullable RelProtoDataType protoRowType; + /** Arrow schema. (In Calcite terminology, more like a row type than a + * Schema.) */ + private final Schema schema; + final ArrowFileReader arrowFileReader; + + ArrowTable(@Nullable RelProtoDataType protoRowType, + ArrowFileReader arrowFileReader) { + try { + this.schema = arrowFileReader.getVectorSchemaRoot().getSchema(); + } catch (IOException e) { + throw Util.toUnchecked(e); + } + this.protoRowType = protoRowType; + this.arrowFileReader = arrowFileReader; + } + + @Override public RelDataType getRowType(RelDataTypeFactory typeFactory) { + if (this.protoRowType != null) { + return this.protoRowType.apply(typeFactory); + } + return deduceRowType(this.schema, (JavaTypeFactory) typeFactory); + } + + @Override public Expression getExpression(SchemaPlus schema, String tableName, + Class clazz) { + return Schemas.tableExpression(schema, getElementType(), tableName, clazz); + } + + /** Called via code generation; see uses of + * {@link org.apache.calcite.adapter.arrow.ArrowMethod#ARROW_QUERY}. */ + @SuppressWarnings("unused") + public Enumerable query(DataContext root, ImmutableIntList fields, + List conditions) { + requireNonNull(fields, "fields"); + final Projector projector; + final Filter filter; + + if (conditions.size() == 0) { + filter = null; + + final List expressionTrees = new ArrayList<>(); + for (int fieldOrdinal : fields) { + Field field = schema.getFields().get(fieldOrdinal); + TreeNode node = TreeBuilder.makeField(field); + expressionTrees.add(TreeBuilder.makeExpression(node, field)); + } + try { + projector = Projector.make(schema, expressionTrees); + } catch (GandivaException e) { + throw Util.toUnchecked(e); + } + } else { + projector = null; + + final List conditionNodes = new ArrayList<>(conditions.size()); + for (String condition : conditions) { + String[] data = condition.split(" "); + List treeNodes = new ArrayList<>(2); + treeNodes.add( + TreeBuilder.makeField(schema.getFields() + .get(schema.getFields().indexOf(schema.findField(data[0]))))); + treeNodes.add(makeLiteralNode(data[2], data[3])); + String equality = data[1]; + conditionNodes.add( + TreeBuilder.makeFunction(equality, treeNodes, new ArrowType.Bool())); + } + final Condition filterCondition; + if (conditionNodes.size() == 1) { + filterCondition = TreeBuilder.makeCondition(conditionNodes.get(0)); + } else { + TreeNode treeNode = TreeBuilder.makeAnd(conditionNodes); + filterCondition = TreeBuilder.makeCondition(treeNode); + } + + try { + filter = Filter.make(schema, filterCondition); + } catch (GandivaException e) { + throw Util.toUnchecked(e); + } + } + + return new ArrowEnumerable(arrowFileReader, projector, filter, fields); + } + + @Override public Queryable asQueryable(QueryProvider queryProvider, + SchemaPlus schema, String tableName) { + throw new UnsupportedOperationException(); + } + + @Override public Type getElementType() { + return Object[].class; + } + + @Override public RelNode toRel(RelOptTable.ToRelContext context, + RelOptTable relOptTable) { + final int fieldCount = relOptTable.getRowType().getFieldCount(); + final ImmutableIntList fields = + ImmutableIntList.copyOf(Util.range(fieldCount)); + final RelOptCluster cluster = context.getCluster(); + return new ArrowTableScan(cluster, cluster.traitSetOf(ArrowRel.CONVENTION), + relOptTable, this, fields); + } + + private static RelDataType deduceRowType(Schema schema, + JavaTypeFactory typeFactory) { + final RelDataTypeFactory.Builder builder = typeFactory.builder(); + for (Field field : schema.getFields()) { + builder.add(field.getName(), + ArrowFieldType.of(field.getType()).toType(typeFactory)); + } + return builder.build(); + } + + private static TreeNode makeLiteralNode(String literal, String type) { + switch (type) { + case "integer": + return TreeBuilder.makeLiteral(Integer.parseInt(literal)); + case "long": + return TreeBuilder.makeLiteral(Long.parseLong(literal)); + case "float": + return TreeBuilder.makeLiteral(Float.parseFloat(literal)); + case "double": + return TreeBuilder.makeLiteral(Double.parseDouble(literal)); + case "string": + return TreeBuilder.makeStringLiteral(literal.substring(1, literal.length() - 1)); + default: + throw new AssertionError("Invalid literal " + literal + + ", type " + type); + } + } +} diff --git a/arrow/src/main/java/org/apache/calcite/adapter/arrow/ArrowTableScan.java b/arrow/src/main/java/org/apache/calcite/adapter/arrow/ArrowTableScan.java new file mode 100644 index 00000000000..a589cf04358 --- /dev/null +++ b/arrow/src/main/java/org/apache/calcite/adapter/arrow/ArrowTableScan.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.calcite.adapter.arrow; + +import org.apache.calcite.plan.RelOptCluster; +import org.apache.calcite.plan.RelOptPlanner; +import org.apache.calcite.plan.RelOptRule; +import org.apache.calcite.plan.RelOptTable; +import org.apache.calcite.plan.RelTraitSet; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.RelWriter; +import org.apache.calcite.rel.core.TableScan; +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.rel.type.RelDataTypeFactory; +import org.apache.calcite.rel.type.RelDataTypeField; +import org.apache.calcite.util.ImmutableIntList; + +import com.google.common.collect.ImmutableList; + +import java.util.List; + +/** + * Relational expression representing a scan of an Arrow collection. + */ +class ArrowTableScan extends TableScan implements ArrowRel { + final ArrowTable arrowTable; + private final ImmutableIntList fields; + + ArrowTableScan(RelOptCluster cluster, RelTraitSet traitSet, + RelOptTable relOptTable, ArrowTable arrowTable, ImmutableIntList fields) { + super(cluster, traitSet, ImmutableList.of(), relOptTable); + this.arrowTable = arrowTable; + this.fields = fields; + } + + @Override public RelNode copy(RelTraitSet traitSet, List inputs) { + assert inputs.isEmpty(); + return this; + } + + @Override public RelWriter explainTerms(RelWriter pw) { + return super.explainTerms(pw) + .item("fields", fields); + } + + @Override public RelDataType deriveRowType() { + final List fieldList = table.getRowType().getFieldList(); + final RelDataTypeFactory.Builder builder = + getCluster().getTypeFactory().builder(); + for (int field : fields) { + builder.add(fieldList.get(field)); + } + return builder.build(); + } + + @Override public void register(RelOptPlanner planner) { + planner.addRule(ArrowRules.TO_ENUMERABLE); + for (RelOptRule rule : ArrowRules.RULES) { + planner.addRule(rule); + } + } + + @Override public void implement(ArrowRel.Implementor implementor) { + implementor.arrowTable = arrowTable; + implementor.table = table; + } +} diff --git a/arrow/src/main/java/org/apache/calcite/adapter/arrow/ArrowToEnumerableConverter.java b/arrow/src/main/java/org/apache/calcite/adapter/arrow/ArrowToEnumerableConverter.java new file mode 100644 index 00000000000..5e8e0a710ef --- /dev/null +++ b/arrow/src/main/java/org/apache/calcite/adapter/arrow/ArrowToEnumerableConverter.java @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.calcite.adapter.arrow; + +import org.apache.calcite.adapter.enumerable.EnumerableRel; +import org.apache.calcite.adapter.enumerable.EnumerableRelImplementor; +import org.apache.calcite.adapter.enumerable.PhysType; +import org.apache.calcite.adapter.enumerable.PhysTypeImpl; +import org.apache.calcite.linq4j.tree.Blocks; +import org.apache.calcite.linq4j.tree.Expressions; +import org.apache.calcite.plan.ConventionTraitDef; +import org.apache.calcite.plan.RelOptCluster; +import org.apache.calcite.plan.RelOptCost; +import org.apache.calcite.plan.RelOptPlanner; +import org.apache.calcite.plan.RelOptTable; +import org.apache.calcite.plan.RelTraitSet; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.convert.ConverterImpl; +import org.apache.calcite.rel.metadata.RelMetadataQuery; +import org.apache.calcite.util.BuiltInMethod; + +import com.google.common.primitives.Ints; + +import java.util.List; + +import static java.util.Objects.requireNonNull; + +/** + * Relational expression representing a scan of a table in an Arrow data source. + */ +class ArrowToEnumerableConverter + extends ConverterImpl implements EnumerableRel { + + protected ArrowToEnumerableConverter(RelOptCluster cluster, + RelTraitSet traitSet, RelNode input) { + super(cluster, ConventionTraitDef.INSTANCE, traitSet, input); + } + + @Override public RelNode copy(RelTraitSet traitSet, List inputs) { + return new ArrowToEnumerableConverter(getCluster(), traitSet, sole(inputs)); + } + + @Override public RelOptCost computeSelfCost(RelOptPlanner planner, + RelMetadataQuery mq) { + RelOptCost cost = super.computeSelfCost(planner, mq); + return requireNonNull(cost, "cost").multiplyBy(.1); + } + + @Override public Result implement(EnumerableRelImplementor implementor, + Prefer pref) { + final ArrowRel.Implementor arrowImplementor = new ArrowRel.Implementor(); + arrowImplementor.visitInput(0, getInput()); + PhysType physType = + PhysTypeImpl.of( + implementor.getTypeFactory(), + getRowType(), + pref.preferArray()); + + final RelOptTable table = requireNonNull(arrowImplementor.table, "table"); + final int fieldCount = table.getRowType().getFieldCount(); + return implementor.result(physType, + Blocks.toBlock( + Expressions.call(table.getExpression(ArrowTable.class), + ArrowMethod.ARROW_QUERY.method, implementor.getRootExpression(), + arrowImplementor.selectFields != null + ? Expressions.call( + BuiltInMethod.IMMUTABLE_INT_LIST_COPY_OF.method, + Expressions.constant( + Ints.toArray(arrowImplementor.selectFields))) + : Expressions.call( + BuiltInMethod.IMMUTABLE_INT_LIST_IDENTITY.method, + Expressions.constant(fieldCount)), + Expressions.constant(arrowImplementor.whereClause)))); + } +} diff --git a/arrow/src/main/java/org/apache/calcite/adapter/arrow/ArrowTranslator.java b/arrow/src/main/java/org/apache/calcite/adapter/arrow/ArrowTranslator.java new file mode 100644 index 00000000000..859dfa0bafb --- /dev/null +++ b/arrow/src/main/java/org/apache/calcite/adapter/arrow/ArrowTranslator.java @@ -0,0 +1,208 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.calcite.adapter.arrow; + +import org.apache.calcite.plan.RelOptUtil; +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.rel.type.RelDataTypeField; +import org.apache.calcite.rex.RexBuilder; +import org.apache.calcite.rex.RexCall; +import org.apache.calcite.rex.RexInputRef; +import org.apache.calcite.rex.RexLiteral; +import org.apache.calcite.rex.RexNode; +import org.apache.calcite.rex.RexUtil; +import org.apache.calcite.sql.type.SqlTypeName; +import org.apache.calcite.util.DateString; + +import org.checkerframework.checker.nullness.qual.Nullable; + +import java.math.BigDecimal; +import java.text.SimpleDateFormat; +import java.util.ArrayList; +import java.util.List; + +import static org.apache.calcite.util.DateTimeStringUtils.ISO_DATETIME_FRACTIONAL_SECOND_FORMAT; +import static org.apache.calcite.util.DateTimeStringUtils.getDateFormatter; + +import static java.util.Objects.requireNonNull; + +/** + * Translates a {@link RexNode} expression to a Gandiva string. + */ +class ArrowTranslator { + final RexBuilder rexBuilder; + final RelDataType rowType; + final List fieldNames; + + /** Private constructor. */ + ArrowTranslator(RexBuilder rexBuilder, RelDataType rowType) { + this.rexBuilder = rexBuilder; + this.rowType = rowType; + this.fieldNames = ArrowRules.arrowFieldNames(rowType); + } + + /** Creates an ArrowTranslator. */ + public static ArrowTranslator create(RexBuilder rexBuilder, + RelDataType rowType) { + return new ArrowTranslator(rexBuilder, rowType); + } + + List translateMatch(RexNode condition) { + List disjunctions = RelOptUtil.disjunctions(condition); + if (disjunctions.size() == 1) { + return translateAnd(disjunctions.get(0)); + } else { + throw new AssertionError("cannot translate " + condition); + } + } + + /** + * Returns the value of the literal. + * + * @param literal Literal to translate + * + * @return The value of the literal in the form of the actual type + */ + private static Object literalValue(RexLiteral literal) { + switch (literal.getTypeName()) { + case TIMESTAMP: + case TIMESTAMP_WITH_LOCAL_TIME_ZONE: + final SimpleDateFormat dateFormatter = + getDateFormatter(ISO_DATETIME_FRACTIONAL_SECOND_FORMAT); + Long millis = literal.getValueAs(Long.class); + return dateFormatter.format(requireNonNull(millis, "millis")); + case DATE: + final DateString dateString = literal.getValueAs(DateString.class); + return requireNonNull(dateString, "dateString").toString(); + default: + return requireNonNull(literal.getValue3()); + } + } + + /** + * Translate a conjunctive predicate to a SQL string. + * + * @param condition A conjunctive predicate + * + * @return SQL string for the predicate + */ + private List translateAnd(RexNode condition) { + List predicates = new ArrayList<>(); + for (RexNode node : RelOptUtil.conjunctions(condition)) { + switch (node.getKind()) { + case SEARCH: + final RexNode node2 = RexUtil.expandSearch(rexBuilder, null, node); + predicates.addAll(translateMatch(node2)); + break; + default: + predicates.add(translateMatch2(node)); + } + } + return predicates; + } + + /** Translate a binary relation. */ + private String translateMatch2(RexNode node) { + switch (node.getKind()) { + case EQUALS: + return translateBinary("equal", "=", (RexCall) node); + case LESS_THAN: + return translateBinary("less_than", ">", (RexCall) node); + case LESS_THAN_OR_EQUAL: + return translateBinary("less_than_or_equal_to", ">=", (RexCall) node); + case GREATER_THAN: + return translateBinary("greater_than", "<", (RexCall) node); + case GREATER_THAN_OR_EQUAL: + return translateBinary("greater_than_or_equal_to", "<=", (RexCall) node); + default: + throw new AssertionError("cannot translate " + node); + } + } + + /** + * Translates a call to a binary operator, reversing arguments if + * necessary. + */ + private String translateBinary(String op, String rop, RexCall call) { + final RexNode left = call.operands.get(0); + final RexNode right = call.operands.get(1); + @Nullable String expression = translateBinary2(op, left, right); + if (expression != null) { + return expression; + } + expression = translateBinary2(rop, right, left); + if (expression != null) { + return expression; + } + throw new AssertionError("cannot translate op " + op + " call " + call); + } + + /** Translates a call to a binary operator. Returns null on failure. */ + private @Nullable String translateBinary2(String op, RexNode left, RexNode right) { + switch (right.getKind()) { + case LITERAL: + break; + default: + return null; + } + final RexLiteral rightLiteral = (RexLiteral) right; + switch (left.getKind()) { + case INPUT_REF: + final RexInputRef left1 = (RexInputRef) left; + String name = fieldNames.get(left1.getIndex()); + return translateOp2(op, name, rightLiteral); + case CAST: + // FIXME This will not work in all cases (for example, we ignore string encoding) + return translateBinary2(op, ((RexCall) left).operands.get(0), right); + default: + return null; + } + } + + /** Combines a field name, operator, and literal to produce a predicate string. */ + private String translateOp2(String op, String name, RexLiteral right) { + // In case this is a key, record that it is now restricted + + Object value = literalValue(right); + String valueString = value.toString(); + String valueType = getLiteralType(value); + + if (value instanceof String) { + final RelDataTypeField field = requireNonNull(rowType.getField(name, true, false), "field"); + SqlTypeName typeName = field.getType().getSqlTypeName(); + if (typeName != SqlTypeName.CHAR) { + valueString = "'" + valueString + "'"; + } + } + return name + " " + op + " " + valueString + " " + valueType; + } + + private static String getLiteralType(Object literal) { + if (literal instanceof BigDecimal) { + BigDecimal bigDecimalLiteral = (BigDecimal) literal; + int scale = bigDecimalLiteral.scale(); + if (scale == 0) { + return "integer"; + } else if (scale > 0) { + return "float"; + } + } else if (String.class.equals(literal.getClass())) { + return "string"; + } + throw new AssertionError("Invalid literal"); + } +} diff --git a/arrow/src/main/java/org/apache/calcite/adapter/arrow/package-info.java b/arrow/src/main/java/org/apache/calcite/adapter/arrow/package-info.java new file mode 100644 index 00000000000..51ced6b206b --- /dev/null +++ b/arrow/src/main/java/org/apache/calcite/adapter/arrow/package-info.java @@ -0,0 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Query provider that reads from Arrow files. + */ +package org.apache.calcite.adapter.arrow; diff --git a/arrow/src/test/java/org/apache/calcite/adapter/arrow/ArrowAdapterTest.java b/arrow/src/test/java/org/apache/calcite/adapter/arrow/ArrowAdapterTest.java new file mode 100644 index 00000000000..526b4584220 --- /dev/null +++ b/arrow/src/test/java/org/apache/calcite/adapter/arrow/ArrowAdapterTest.java @@ -0,0 +1,503 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.calcite.adapter.arrow; + +import org.apache.calcite.jdbc.JavaTypeFactoryImpl; +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.rel.type.RelDataTypeFactory; +import org.apache.calcite.rel.type.RelDataTypeSystem; +import org.apache.calcite.schema.Table; +import org.apache.calcite.test.CalciteAssert; +import org.apache.calcite.util.Sources; + +import org.apache.arrow.gandiva.evaluator.Projector; +import org.apache.arrow.gandiva.exceptions.GandivaException; +import org.apache.arrow.gandiva.expression.ExpressionTree; +import org.apache.arrow.vector.types.pojo.Schema; + +import com.google.common.collect.ImmutableMap; + +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; + +import java.io.File; +import java.io.IOException; +import java.net.URL; +import java.nio.file.Files; +import java.nio.file.Path; +import java.sql.SQLException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Objects; + +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.junit.jupiter.api.Assumptions.assumeTrue; + +/** + * Tests for the Apache Arrow adapter. + */ +class ArrowAdapterTest { + private static Map arrow; + private static File arrowDataDirectory; + private static boolean hasGandivaSupport = detectGandivaSupport(); + + ArrowAdapterTest() { + assumeTrue(hasGandivaSupport, "gandiva not supported on this platform, skipping tests"); + } + + /** + * Gandiva (used to implement arrow filtering / projection) does not currently distribute + * a binary that is compatible with M1 macs on maven central. + * see ARROW-16608. + * + * @return true if we believe that gandiva is supported on this platform and we can run the tests + */ + private static boolean detectGandivaSupport() { + try { + Schema emptySchema = new Schema(new ArrayList<>(), null); + List expressions = new ArrayList<>(); + Projector.make(emptySchema, expressions); + } catch (GandivaException e) { + // this is ok -- we'll always hit this because of an empty expression + // the fact that we got here, is indicative that the JNI library was loaded properly + return true; + } catch (UnsatisfiedLinkError e) { + return false; + } + return true; + } + + @BeforeAll + static void initializeArrowState(@TempDir Path sharedTempDir) throws IOException, SQLException { + URL modelUrl = + Objects.requireNonNull(ArrowAdapterTest.class.getResource("/arrow-model.json"), "url"); + Path sourceModelFilePath = Sources.of(modelUrl).file().toPath(); + Path modelFileTarget = sharedTempDir.resolve("arrow-model.json"); + Files.copy(sourceModelFilePath, modelFileTarget); + + Path arrowFilesDirectory = sharedTempDir.resolve("arrow"); + Files.createDirectory(arrowFilesDirectory); + arrowDataDirectory = arrowFilesDirectory.toFile(); + + File dataLocationFile = arrowFilesDirectory.resolve("arrowdata.arrow").toFile(); + ArrowData arrowDataGenerator = new ArrowData(); + arrowDataGenerator.writeArrowData(dataLocationFile); + arrowDataGenerator.writeScottEmpData(arrowFilesDirectory); + + arrow = ImmutableMap.of("model", modelFileTarget.toAbsolutePath().toString()); + } + + /** Test to read an Arrow file and check its field names. */ + @Test void testArrowSchema() { + ArrowSchema arrowSchema = new ArrowSchema(arrowDataDirectory); + Map tableMap = arrowSchema.getTableMap(); + RelDataTypeFactory typeFactory = + new JavaTypeFactoryImpl(RelDataTypeSystem.DEFAULT); + RelDataType relDataType = tableMap.get("ARROWDATA").getRowType(typeFactory); + + assertThat(relDataType.getFieldNames().get(0), is("intField")); + assertThat(relDataType.getFieldNames().get(1), is("stringField")); + assertThat(relDataType.getFieldNames().get(2), is("floatField")); + } + + @Test void testArrowProjectAllFields() { + String sql = "select * from arrowdata\n"; + String plan = "PLAN=ArrowToEnumerableConverter\n" + + " ArrowTableScan(table=[[ARROW, ARROWDATA]], fields=[[0, 1, 2, 3]])\n\n"; + String result = "intField=0; stringField=0; floatField=0.0; longField=0\n" + + "intField=1; stringField=1; floatField=1.0; longField=1\n" + + "intField=2; stringField=2; floatField=2.0; longField=2\n" + + "intField=3; stringField=3; floatField=3.0; longField=3\n" + + "intField=4; stringField=4; floatField=4.0; longField=4\n" + + "intField=5; stringField=5; floatField=5.0; longField=5\n"; + CalciteAssert.that() + .with(arrow) + .query(sql) + .limit(6) + .returns(result) + .explainContains(plan); + } + + @Test void testArrowProjectSingleField() { + String sql = "select \"intField\" from arrowdata\n"; + String result = "intField=0\nintField=1\nintField=2\n" + + "intField=3\nintField=4\nintField=5\n"; + String plan = "PLAN=ArrowToEnumerableConverter\n" + + " ArrowProject(intField=[$0])\n" + + " ArrowTableScan(table=[[ARROW, ARROWDATA]], fields=[[0, 1, 2, 3]])\n\n"; + CalciteAssert.that() + .with(arrow) + .query(sql) + .limit(6) + .returns(result) + .explainContains(plan); + } + + @Test void testArrowProjectTwoFields() { + String sql = "select \"intField\", \"stringField\" from arrowdata\n"; + String result = "intField=0; stringField=0\n" + + "intField=1; stringField=1\n" + + "intField=2; stringField=2\n" + + "intField=3; stringField=3\n" + + "intField=4; stringField=4\n" + + "intField=5; stringField=5\n"; + String plan = "PLAN=ArrowToEnumerableConverter\n" + + " ArrowProject(intField=[$0], stringField=[$1])\n" + + " ArrowTableScan(table=[[ARROW, ARROWDATA]], fields=[[0, 1, 2, 3]])\n\n"; + CalciteAssert.that() + .with(arrow) + .query(sql) + .limit(6) + .returns(result) + .explainContains(plan); + } + + @Test void testArrowProjectFieldsWithIntegerFilter() { + String sql = "select \"intField\", \"stringField\"\n" + + "from arrowdata\n" + + "where \"intField\" < 4"; + String result = "intField=0; stringField=0\n" + + "intField=1; stringField=1\n" + + "intField=2; stringField=2\n" + + "intField=3; stringField=3\n"; + String plan = "PLAN=ArrowToEnumerableConverter\n" + + " ArrowProject(intField=[$0], stringField=[$1])\n" + + " ArrowFilter(condition=[<($0, 4)])\n" + + " ArrowTableScan(table=[[ARROW, ARROWDATA]], fields=[[0, 1, 2, 3]])\n\n"; + CalciteAssert.that() + .with(arrow) + .query(sql) + .limit(4) + .returns(result) + .explainContains(plan); + } + + @Test void testArrowProjectFieldsWithMultipleFilters() { + String sql = "select \"intField\", \"stringField\"\n" + + "from arrowdata\n" + + "where \"intField\"=12 and \"stringField\"='12'"; + String plan = "PLAN=ArrowToEnumerableConverter\n" + + " ArrowProject(intField=[$0], stringField=[$1])\n" + + " ArrowFilter(condition=[AND(=($0, 12), =($1, '12'))])\n" + + " ArrowTableScan(table=[[ARROW, ARROWDATA]], fields=[[0, 1, 2, 3]])\n\n"; + String result = "intField=12; stringField=12\n"; + CalciteAssert.that() + .with(arrow) + .query(sql) + .limit(3) + .returns(result) + .explainContains(plan); + } + + @Test void testArrowProjectFieldsWithFloatFilter() { + String sql = "select * from arrowdata\n" + + " where \"floatField\"=15.0"; + String plan = "PLAN=ArrowToEnumerableConverter\n" + + " ArrowFilter(condition=[=(CAST($2):DOUBLE, 15.0)])\n" + + " ArrowTableScan(table=[[ARROW, ARROWDATA]], fields=[[0, 1, 2, 3]])\n\n"; + String result = "intField=15; stringField=15; floatField=15.0; longField=15\n"; + CalciteAssert.that() + .with(arrow) + .query(sql) + .returns(result) + .explainContains(plan); + } + + @Test void testArrowProjectFieldsWithFilterOnLaterBatch() { + String sql = "select \"intField\"\n" + + "from arrowdata\n" + + "where \"intField\"=25"; + String plan = "PLAN=ArrowToEnumerableConverter\n" + + " ArrowProject(intField=[$0])\n" + + " ArrowFilter(condition=[=($0, 25)])\n" + + " ArrowTableScan(table=[[ARROW, ARROWDATA]], fields=[[0, 1, 2, 3]])\n\n"; + String result = "intField=25\n"; + CalciteAssert.that() + .with(arrow) + .query(sql) + .returns(result) + .explainContains(plan); + } + + // TODO: test a table whose field names contain spaces, + // e.g. 'SELECT ... WHERE "my Field" > 5' + // (code generator does not seem to quote field names currently) + + // TODO: test a character literal that contains a single-quote + // (quoting of literals for Gandiva doesn't look safe to me) + + // TODO: test various data types (TINYINT, SMALLINT, INTEGER, BIGINT, REAL, + // FLOAT, DATE, TIME, TIMESTAMP, INTERVAL SECOND, INTERVAL MONTH, CHAR, + // VARCHAR, BINARY, VARBINARY, BOOLEAN) with and without NOT NULL + + @Test void testTinyIntProject() { + String sql = "select DEPTNO from DEPT"; + String plan = "PLAN=ArrowToEnumerableConverter\n" + + " ArrowProject(DEPTNO=[$0])\n" + + " ArrowTableScan(table=[[ARROW, DEPT]], fields=[[0, 1, 2]])\n\n"; + String result = "DEPTNO=10\nDEPTNO=20\nDEPTNO=30\nDEPTNO=40\n"; + CalciteAssert.that() + .with(arrow) + .query(sql) + .returns(result) + .explainContains(plan); + } + + @Test void testSmallIntProject() { + String sql = "select EMPNO from EMP"; + String plan = "PLAN=ArrowToEnumerableConverter\n" + + " ArrowProject(EMPNO=[$0])\n" + + " ArrowTableScan(table=[[ARROW, EMP]], fields=[[0, 1, 2, 3, 4, 5, 6, 7]])\n\n"; + String result = "EMPNO=7369\nEMPNO=7499\nEMPNO=7521\n"; + CalciteAssert.that() + .with(arrow) + .query(sql) + .limit(3) + .returns(result) + .explainContains(plan); + } + + // TODO: test casts, including lossy casts + @Test void testCastDecimalToInt() { + String sql = "select CAST(LOSAL AS INT) as \"trunc\" from SALGRADE"; + String plan = + "PLAN=EnumerableCalc(expr#0..2=[{inputs}], expr#3=[CAST($t1):INTEGER], trunc=[$t3])\n" + + " ArrowToEnumerableConverter\n" + + " ArrowTableScan(table=[[ARROW, SALGRADE]], fields=[[0, 1, 2]])\n\n"; + String result = "trunc=700\n"; + CalciteAssert.that() + .with(arrow) + .query(sql) + .typeIs("[trunc INTEGER]") + .limit(1) + .returns(result) + .explainContains(plan); + } + + @Test void testCastDecimalToFloat() { + String sql = "select CAST(LOSAL AS FLOAT) as \"extra\" from SALGRADE"; + String plan = "PLAN=EnumerableCalc(expr#0..2=[{inputs}]," + + " expr#3=[CAST($t1):FLOAT], extra=[$t3])\n" + + " ArrowToEnumerableConverter\n" + + " ArrowTableScan(table=[[ARROW, SALGRADE]], fields=[[0, 1, 2]])\n\n"; + String result = "extra=700.0\nextra=1201.0\n"; + CalciteAssert.that() + .with(arrow) + .query(sql) + .typeIs("[extra FLOAT]") + .limit(2) + .returns(result) + .explainContains(plan); + } + + @Test void testCastDecimalToDouble() { + String sql = "select CAST(LOSAL AS DOUBLE) as \"extra\" from SALGRADE"; + String plan = + "PLAN=EnumerableCalc(expr#0..2=[{inputs}], expr#3=[CAST($t1):DOUBLE], extra=[$t3])\n" + + " ArrowToEnumerableConverter\n" + + " ArrowTableScan(table=[[ARROW, SALGRADE]], fields=[[0, 1, 2]])\n\n"; + String result = "extra=700.0\nextra=1201.0\n"; + CalciteAssert.that() + .with(arrow) + .query(sql) + .typeIs("[extra DOUBLE]") + .limit(2) + .returns(result) + .explainContains(plan); + } + + @Test void testCastIntToDouble() { + String sql = "select CAST(\"intField\" AS DOUBLE) as \"dbl\" from arrowdata"; + String result = "dbl=0.0\ndbl=1.0\n"; + String plan = + "PLAN=EnumerableCalc(expr#0..3=[{inputs}], expr#4=[CAST($t0):DOUBLE], dbl=[$t4])\n" + + " ArrowToEnumerableConverter\n" + + " ArrowTableScan(table=[[ARROW, ARROWDATA]], fields=[[0, 1, 2, 3]])\n\n"; + + CalciteAssert.that() + .with(arrow) + .query(sql) + .typeIs("[dbl DOUBLE]") + .limit(2) + .returns(result) + .explainContains(plan); + } + + // TODO: test IS NULL, IS NOT NULL + + // TODO: test 3-valued boolean logic, e.g. 'WHERE (x > 5) IS NOT FALSE', + // 'SELECT (x > 5) ...' + + @Test void testStringOperation() { + String sql = "select\n" + + " \"stringField\" || '_suffix' as \"field1\"\n" + + "from arrowdata"; + String plan = "PLAN=EnumerableCalc(expr#0..3=[{inputs}], expr#4=['_suffix'], " + + "expr#5=[||($t1, $t4)], field1=[$t5])\n" + + " ArrowToEnumerableConverter\n" + + " ArrowTableScan(table=[[ARROW, ARROWDATA]], fields=[[0, 1, 2, 3]])\n\n"; + String result = "field1=0_suffix\n"; + CalciteAssert.that() + .with(arrow) + .query(sql) + .limit(1) + .returns(result) + .explainContains(plan); + } + + // TODO: test a join + // (The implementor can only hold one table at a time, so I suspect this will + // have problems.) + + // TODO: test a union + // (The implementor can only hold one table at a time, so I suspect this will + // have problems.) + + // TODO: test a filter on a project on a filter on a project + // (The implementor may not be able to combine multiple selections and + // projectors. Also, in some cases the optimal plan would combine selections + // and projectors.) + + // TODO: test an OR condition + + // TODO: test an IN condition, e.g. x IN (1, 7, 8, 9) + + @Test void testAggWithoutAggFunctions() { + String sql = "select DISTINCT(\"intField\") as \"dep\" from arrowdata"; + String result = "dep=0\ndep=1\n"; + + String plan = "PLAN=EnumerableAggregate(group=[{0}])\n" + + " ArrowToEnumerableConverter\n" + + " ArrowTableScan(table=[[ARROW, ARROWDATA]], fields=[[0, 1, 2, 3]])\n\n"; + + CalciteAssert.that() + .with(arrow) + .query(sql) + .limit(2) + .returns(result) + .explainContains(plan); + } + + @Test void testAggWithAggFunctions() { + String sql = "select JOB, SUM(SAL) as TOTAL from EMP GROUP BY JOB"; + String result = "JOB=SALESMAN; TOTAL=5600.00\nJOB=ANALYST; TOTAL=6000.00\n"; + + String plan = "PLAN=EnumerableAggregate(group=[{2}], TOTAL=[SUM($5)])\n" + + " ArrowToEnumerableConverter\n" + + " ArrowTableScan(table=[[ARROW, EMP]], fields=[[0, 1, 2, 3, 4, 5, 6, 7]])\n\n"; + + CalciteAssert.that() + .with(arrow) + .query(sql) + .limit(2) + .returns(result) + .explainContains(plan); + } + + @Test void testFilteredAgg() { + // TODO add group by + String sql = "select SUM(SAL) FILTER (WHERE COMM > 400) as SALESSUM from EMP"; + String result = "SALESSUM=2500.00\n"; + + String plan = "PLAN=EnumerableAggregate(group=[{}], SALESSUM=[SUM($0) FILTER $1])\n" + + " EnumerableCalc(expr#0..7=[{inputs}], expr#8=[400], expr#9=[>($t6, $t8)], " + + "expr#10=[IS TRUE($t9)], SAL=[$t5], $f1=[$t10])\n" + + " ArrowToEnumerableConverter\n" + + " ArrowTableScan(table=[[ARROW, EMP]], fields=[[0, 1, 2, 3, 4, 5, 6, 7]])\n\n"; + + CalciteAssert.that() + .with(arrow) + .query(sql) + .limit(2) + .returns(result) + .explainContains(plan); + } + + @Test void testAggGroupedByNullable() { + String sql = "select COMM, SUM(SAL) as SALESSUM from EMP GROUP BY COMM"; + String result = "COMM=0.00; SALESSUM=1500.00\n" + + "COMM=1400.00; SALESSUM=1250.00\n" + + "COMM=300.00; SALESSUM=1600.00\n" + + "COMM=500.00; SALESSUM=1250.00\n" + + "COMM=null; SALESSUM=23425.00"; + + String plan = "PLAN=EnumerableAggregate(group=[{6}], SALESSUM=[SUM($5)])\n" + + " ArrowToEnumerableConverter\n" + + " ArrowTableScan(table=[[ARROW, EMP]], fields=[[0, 1, 2, 3, 4, 5, 6, 7]])\n\n"; + + CalciteAssert.that() + .with(arrow) + .query(sql) + .returnsUnordered("COMM=0.00; SALESSUM=1500.00", + "COMM=1400.00; SALESSUM=1250.00", + "COMM=300.00; SALESSUM=1600.00", + "COMM=500.00; SALESSUM=1250.00", + "COMM=null; SALESSUM=23425.00") + .explainContains(plan); + } + + @Test void testArrowAdapterLimitNoSort() { + String sql = "select \"intField\"\n" + + "from arrowdata\n" + + "limit 2"; + String plan = "PLAN=EnumerableCalc(expr#0..3=[{inputs}], intField=[$t0])\n" + + " EnumerableLimit(fetch=[2])\n" + + " ArrowToEnumerableConverter\n" + + " ArrowTableScan(table=[[ARROW, ARROWDATA]], fields=[[0, 1, 2, 3]])\n\n"; + String result = "intField=0\nintField=1\n"; + CalciteAssert.that() + .with(arrow) + .query(sql) + .returns(result) + .explainContains(plan); + } + + @Test void testArrowLimitOffsetNoSort() { + String sql = "select \"intField\"\n" + + "from arrowdata\n" + + "limit 2 offset 2"; + String plan = "PLAN=EnumerableCalc(expr#0..3=[{inputs}], intField=[$t0])\n" + + " EnumerableLimit(offset=[2], fetch=[2])\n" + + " ArrowToEnumerableConverter\n" + + " ArrowTableScan(table=[[ARROW, ARROWDATA]], fields=[[0, 1, 2, 3]])\n\n"; + String result = "intField=2\nintField=3\n"; + CalciteAssert.that() + .with(arrow) + .query(sql) + .returns(result) + .explainContains(plan); + } + + @Test void testArrowSortOnLong() { + String sql = "select \"intField\" from arrowdata order by \"longField\" desc"; + String plan = "PLAN=EnumerableSort(sort0=[$1], dir0=[DESC])\n" + + " ArrowToEnumerableConverter\n" + + " ArrowProject(intField=[$0], longField=[$3])\n" + + " ArrowTableScan(table=[[ARROW, ARROWDATA]], fields=[[0, 1, 2, 3]])\n\n"; + String result = "intField=49\nintField=48\n"; + + CalciteAssert.that() + .with(arrow) + .query(sql) + .limit(2) + .returns(result) + .explainContains(plan); + } +} diff --git a/arrow/src/test/java/org/apache/calcite/adapter/arrow/ArrowData.java b/arrow/src/test/java/org/apache/calcite/adapter/arrow/ArrowData.java new file mode 100644 index 00000000000..3870bb2e115 --- /dev/null +++ b/arrow/src/test/java/org/apache/calcite/adapter/arrow/ArrowData.java @@ -0,0 +1,222 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.calcite.adapter.arrow; + +import org.apache.arrow.adapter.jdbc.ArrowVectorIterator; +import org.apache.arrow.adapter.jdbc.JdbcToArrow; +import org.apache.arrow.adapter.jdbc.JdbcToArrowConfig; +import org.apache.arrow.adapter.jdbc.JdbcToArrowConfigBuilder; +import org.apache.arrow.adapter.jdbc.JdbcToArrowUtils; +import org.apache.arrow.memory.RootAllocator; +import org.apache.arrow.vector.BigIntVector; +import org.apache.arrow.vector.FieldVector; +import org.apache.arrow.vector.FloatingPointVector; +import org.apache.arrow.vector.IntVector; +import org.apache.arrow.vector.VarCharVector; +import org.apache.arrow.vector.VectorSchemaRoot; +import org.apache.arrow.vector.ipc.ArrowFileWriter; +import org.apache.arrow.vector.types.FloatingPointPrecision; +import org.apache.arrow.vector.types.pojo.ArrowType; +import org.apache.arrow.vector.types.pojo.Field; +import org.apache.arrow.vector.types.pojo.FieldType; +import org.apache.arrow.vector.types.pojo.Schema; +import org.apache.arrow.vector.util.Text; + +import com.google.common.collect.ImmutableList; + +import net.hydromatic.scott.data.hsqldb.ScottHsqldb; + +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.nio.file.Path; +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.Calendar; +import java.util.List; + +/** + * Class that can be used to generate Arrow sample data into a data directory. + */ +public class ArrowData { + + private final int batchSize; + private final int entries; + private int intValue; + private int stringValue; + private float floatValue; + private long longValue; + + public ArrowData() { + this.batchSize = 20; + this.entries = 50; + this.intValue = 0; + this.stringValue = 0; + this.floatValue = 0; + this.longValue = 0; + } + + private Schema makeArrowSchema() { + ImmutableList.Builder childrenBuilder = ImmutableList.builder(); + FieldType intType = FieldType.nullable(new ArrowType.Int(32, true)); + FieldType stringType = FieldType.nullable(new ArrowType.Utf8()); + FieldType floatType = + FieldType.nullable(new ArrowType.FloatingPoint(FloatingPointPrecision.SINGLE)); + FieldType longType = FieldType.nullable(new ArrowType.Int(64, true)); + + childrenBuilder.add(new Field("intField", intType, null)); + childrenBuilder.add(new Field("stringField", stringType, null)); + childrenBuilder.add(new Field("floatField", floatType, null)); + childrenBuilder.add(new Field("longField", longType, null)); + + return new Schema(childrenBuilder.build(), null); + } + + public void writeScottEmpData(Path arrowDataDirectory) throws IOException, SQLException { + List tableNames = ImmutableList.of("EMP", "DEPT", "SALGRADE"); + + Connection connection = + DriverManager.getConnection(ScottHsqldb.URI, ScottHsqldb.USER, ScottHsqldb.PASSWORD); + + for (String tableName : tableNames) { + String sql = "SELECT * FROM " + tableName; + Statement statement = connection.createStatement(); + ResultSet resultSet = statement.executeQuery(sql); + + Calendar calendar = JdbcToArrowUtils.getUtcCalendar(); + + RootAllocator rootAllocator = new RootAllocator(); + JdbcToArrowConfig config = new JdbcToArrowConfigBuilder() + .setAllocator(rootAllocator) + .setReuseVectorSchemaRoot(true) + .setCalendar(calendar) + .setTargetBatchSize(1024) + .build(); + + ArrowVectorIterator vectorIterator = JdbcToArrow.sqlToArrowVectorIterator(resultSet, config); + Path tablePath = arrowDataDirectory.resolve(tableName + ".arrow"); + + FileOutputStream fileOutputStream = new FileOutputStream(tablePath.toFile()); + + VectorSchemaRoot vectorSchemaRoot = vectorIterator.next(); + + ArrowFileWriter arrowFileWriter = + new ArrowFileWriter(vectorSchemaRoot, null, fileOutputStream.getChannel()); + + arrowFileWriter.start(); + arrowFileWriter.writeBatch(); + + while (vectorIterator.hasNext()) { + // refreshes the data in the VectorSchemaRoot with the next batch + vectorIterator.next(); + arrowFileWriter.writeBatch(); + } + + arrowFileWriter.close(); + } + } + + public void writeArrowData(File file) throws IOException { + FileOutputStream fileOutputStream = new FileOutputStream(file); + Schema arrowSchema = makeArrowSchema(); + VectorSchemaRoot vectorSchemaRoot = + VectorSchemaRoot.create(arrowSchema, new RootAllocator(Integer.MAX_VALUE)); + ArrowFileWriter arrowFileWriter = + new ArrowFileWriter(vectorSchemaRoot, null, fileOutputStream.getChannel()); + + arrowFileWriter.start(); + + for (int i = 0; i < this.entries;) { + int numRows = Math.min(this.batchSize, this.entries - i); + vectorSchemaRoot.setRowCount(numRows); + for (Field field : vectorSchemaRoot.getSchema().getFields()) { + FieldVector vector = vectorSchemaRoot.getVector(field.getName()); + switch (vector.getMinorType()) { + case INT: + intField(vector, numRows); + break; + case FLOAT4: + floatField(vector, numRows); + break; + case VARCHAR: + varCharField(vector, numRows); + break; + case BIGINT: + longField(vector, numRows); + break; + default: + throw new IllegalStateException("Not supported type yet: " + vector.getMinorType()); + } + } + arrowFileWriter.writeBatch(); + i += numRows; + } + arrowFileWriter.end(); + arrowFileWriter.close(); + fileOutputStream.flush(); + fileOutputStream.close(); + } + + private void intField(FieldVector fieldVector, int rowCount) { + IntVector intVector = (IntVector) fieldVector; + intVector.setInitialCapacity(rowCount); + intVector.allocateNew(); + for (int i = 0; i < rowCount; i++) { + intVector.set(i, 1, intValue); + this.intValue++; + } + fieldVector.setValueCount(rowCount); + } + + private void floatField(FieldVector fieldVector, int rowCount) { + FloatingPointVector floatingPointVector = (FloatingPointVector) fieldVector; + floatingPointVector.setInitialCapacity(rowCount); + floatingPointVector.allocateNew(); + for (int i = 0; i < rowCount; i++) { + float value = this.floatValue; + floatingPointVector.setWithPossibleTruncate(i, value); + this.floatValue++; + } + fieldVector.setValueCount(rowCount); + } + + private void varCharField(FieldVector fieldVector, int rowCount) { + VarCharVector varCharVector = (VarCharVector) fieldVector; + varCharVector.setInitialCapacity(rowCount); + varCharVector.allocateNew(); + for (int i = 0; i < rowCount; i++) { + String value = String.valueOf(this.stringValue); + varCharVector.set(i, new Text(value)); + this.stringValue++; + } + fieldVector.setValueCount(rowCount); + } + + private void longField(FieldVector fieldVector, int rowCount) { + BigIntVector longVector = (BigIntVector) fieldVector; + longVector.setInitialCapacity(rowCount); + longVector.allocateNew(); + for (int i = 0; i < rowCount; i++) { + longVector.set(i, this.longValue); + this.longValue++; + } + fieldVector.setValueCount(rowCount); + } +} diff --git a/arrow/src/test/resources/arrow-model.json b/arrow/src/test/resources/arrow-model.json new file mode 100644 index 00000000000..fed2210b030 --- /dev/null +++ b/arrow/src/test/resources/arrow-model.json @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +{ + "version": "1.0", + "defaultSchema": "ARROW", + "schemas": [ + { + "name": "ARROW", + "type": "custom", + "factory": "org.apache.calcite.adapter.arrow.ArrowSchemaFactory", + "operand": { + "directory": "arrow" + } + } + ] +} diff --git a/bom/build.gradle.kts b/bom/build.gradle.kts index 79fb7cc56bb..3cc33230dae 100644 --- a/bom/build.gradle.kts +++ b/bom/build.gradle.kts @@ -97,6 +97,10 @@ dependencies { apiv("net.java.dev.jna:jna") apiv("net.java.dev.jna:jna-platform") apiv("net.sf.opencsv:opencsv") + apiv("org.apache.arrow:arrow-memory-netty", "arrow") + apiv("org.apache.arrow:arrow-vector", "arrow") + apiv("org.apache.arrow:arrow-jdbc", "arrow") + apiv("org.apache.arrow.gandiva:arrow-gandiva", "arrow-gandiva") apiv("org.apache.calcite.avatica:avatica-core", "calcite.avatica") apiv("org.apache.calcite.avatica:avatica-server", "calcite.avatica") apiv("org.apache.cassandra:cassandra-all") diff --git a/build.gradle.kts b/build.gradle.kts index 0df08ac15c0..e1b9e497caf 100644 --- a/build.gradle.kts +++ b/build.gradle.kts @@ -212,7 +212,7 @@ val javadocAggregateIncludingTests by tasks.registering(Javadoc::class) { } val adaptersForSqlline = listOf( - ":babel", ":cassandra", ":druid", ":elasticsearch", + ":arrow", ":babel", ":cassandra", ":druid", ":elasticsearch", ":file", ":geode", ":innodb", ":kafka", ":mongodb", ":pig", ":piglet", ":plus", ":redis", ":spark", ":splunk") diff --git a/core/src/main/java/org/apache/calcite/adapter/enumerable/EnumerableMergeJoin.java b/core/src/main/java/org/apache/calcite/adapter/enumerable/EnumerableMergeJoin.java index 3f42e065c83..387bab43ca3 100644 --- a/core/src/main/java/org/apache/calcite/adapter/enumerable/EnumerableMergeJoin.java +++ b/core/src/main/java/org/apache/calcite/adapter/enumerable/EnumerableMergeJoin.java @@ -95,8 +95,9 @@ protected EnumerableMergeJoin( final List collations = traits.getTraits(RelCollationTraitDef.INSTANCE); assert collations != null && collations.size() > 0; - ImmutableIntList rightKeys = joinInfo.rightKeys - .incr(left.getRowType().getFieldCount()); + final int leftInputFieldCount = left.getRowType().getFieldCount(); + ImmutableIntList rightKeys = + joinInfo.rightKeys.map(i -> i + leftInputFieldCount); // Currently it has very limited ability to represent the equivalent traits // due to the flaw of RelCompositeTrait, so the following case is totally // legit, but not yet supported: @@ -217,9 +218,9 @@ private static List getCollations(RelTraitSet traits) { int leftInputFieldCount = left.getRowType().getFieldCount(); List reqKeys = RelCollations.ordinals(collation); - List leftKeys = joinInfo.leftKeys.toIntegerList(); + List leftKeys = joinInfo.leftKeys; List rightKeys = - joinInfo.rightKeys.incr(leftInputFieldCount).toIntegerList(); + joinInfo.rightKeys.map(i -> i + leftInputFieldCount); ImmutableBitSet reqKeySet = ImmutableBitSet.of(reqKeys); ImmutableBitSet leftKeySet = ImmutableBitSet.of(joinInfo.leftKeys); diff --git a/core/src/main/java/org/apache/calcite/prepare/CalcitePrepareImpl.java b/core/src/main/java/org/apache/calcite/prepare/CalcitePrepareImpl.java index 4647be8c151..d8a0babc6c8 100644 --- a/core/src/main/java/org/apache/calcite/prepare/CalcitePrepareImpl.java +++ b/core/src/main/java/org/apache/calcite/prepare/CalcitePrepareImpl.java @@ -285,7 +285,7 @@ private AnalyzeViewResult analyze_(SqlValidator validator, String sql, List columnMapping; final Map projectMap = new HashMap<>(); if (project == null) { - columnMapping = ImmutableIntList.range(0, targetRowType.getFieldCount()); + columnMapping = Util.range(targetRowType.getFieldCount()); } else { columnMapping = new ArrayList<>(); for (Ord node : Ord.zip(project.getProjects())) { diff --git a/core/src/main/java/org/apache/calcite/sql2rel/SqlToRelConverter.java b/core/src/main/java/org/apache/calcite/sql2rel/SqlToRelConverter.java index e5afc62468f..66190b640ee 100644 --- a/core/src/main/java/org/apache/calcite/sql2rel/SqlToRelConverter.java +++ b/core/src/main/java/org/apache/calcite/sql2rel/SqlToRelConverter.java @@ -1262,7 +1262,7 @@ private void substituteSubQuery(Blackboard bb, SubQuery subQuery) { typeFactory.createSqlType(SqlTypeName.BIGINT); final RelNode seek = converted.r.getInput(0); // fragile final int keyCount = leftKeys.size(); - final List args = ImmutableIntList.range(0, keyCount); + final List args = Util.range(keyCount); LogicalAggregate aggregate = LogicalAggregate.create(seek, ImmutableList.of(), diff --git a/core/src/main/java/org/apache/calcite/util/BuiltInMethod.java b/core/src/main/java/org/apache/calcite/util/BuiltInMethod.java index 142f1d30d27..c40a6d7184c 100644 --- a/core/src/main/java/org/apache/calcite/util/BuiltInMethod.java +++ b/core/src/main/java/org/apache/calcite/util/BuiltInMethod.java @@ -341,6 +341,8 @@ public enum BuiltInMethod { SORTED_MULTI_MAP_ARRAYS(SortedMultiMap.class, "arrays", Comparator.class), SORTED_MULTI_MAP_SINGLETON(SortedMultiMap.class, "singletonArrayIterator", Comparator.class, List.class), + IMMUTABLE_INT_LIST_IDENTITY(ImmutableIntList.class, "identity", int.class), + IMMUTABLE_INT_LIST_COPY_OF(ImmutableIntList.class, "copyOf", int[].class), BINARY_SEARCH5_LOWER(BinarySearch.class, "lowerBound", Object[].class, Object.class, int.class, int.class, Comparator.class), BINARY_SEARCH5_UPPER(BinarySearch.class, "upperBound", Object[].class, diff --git a/core/src/main/java/org/apache/calcite/util/ImmutableIntList.java b/core/src/main/java/org/apache/calcite/util/ImmutableIntList.java index cb34fe3d539..8ff15859b0a 100644 --- a/core/src/main/java/org/apache/calcite/util/ImmutableIntList.java +++ b/core/src/main/java/org/apache/calcite/util/ImmutableIntList.java @@ -38,6 +38,7 @@ import java.util.NoSuchElementException; import java.util.function.Consumer; import java.util.function.IntConsumer; +import java.util.function.IntUnaryOperator; import static org.apache.calcite.linq4j.Nullness.castNonNull; @@ -70,13 +71,25 @@ public static ImmutableIntList of() { * Creates an ImmutableIntList from an array of {@code int}. */ public static ImmutableIntList of(int... ints) { + if (ints.length == 0) { + return EMPTY; + } return new ImmutableIntList(ints.clone()); } + /** Same as {@link #of(int...)}, but less ambiguous for code generators + * and compilers. */ + public static ImmutableIntList copyOf(int... ints) { + return of(ints); + } + /** * Creates an ImmutableIntList from an array of {@code Number}. */ public static ImmutableIntList copyOf(Number... numbers) { + if (numbers.length == 0) { + return EMPTY; + } final int[] ints = new int[numbers.length]; for (int i = 0; i < ints.length; i++) { ints[i] = numbers[i].intValue(); @@ -108,6 +121,9 @@ public static ImmutableIntList copyOf(Iterator list) { private static ImmutableIntList copyFromCollection( Collection list) { + if (list.isEmpty()) { + return EMPTY; + } final int[] ints = new int[list.size()]; int i = 0; for (Number number : list) { @@ -192,7 +208,11 @@ public int[] toIntArray() { return ints.clone(); } - /** Returns an List of {@code Integer}. */ + /** Returns a List of {@code Integer}. + * + * @deprecated Instead of {@code list.toIntegerList()}, write + * {@code new ArrayList<>(list)} or just {@code list}. */ + @Deprecated // to be removed before 2.0 public List toIntegerList() { ArrayList arrayList = new ArrayList<>(size()); for (int i : ints) { @@ -273,7 +293,10 @@ public ImmutableIntList append(int element) { /** Returns a list that contains the values lower to upper - 1. * - *

For example, {@code range(1, 3)} contains [1, 2]. */ + *

For example, {@code range(1, 3)} contains [1, 2]. + * + * @deprecated Use {@link Util#range(int, int)} */ + @Deprecated // to be removed before 2.0 public static List range(final int lower, final int upper) { return Functions.generate(upper - lower, index -> lower + index); } @@ -283,6 +306,9 @@ public static List range(final int lower, final int upper) { * @see Mappings#isIdentity(List, int) */ public static ImmutableIntList identity(int count) { + if (count == 0) { + return EMPTY; + } final int[] integers = new int[count]; for (int i = 0; i < integers.length; i++) { integers[i] = i; @@ -298,16 +324,22 @@ public ImmutableIntList appendAll(Iterable list) { return ImmutableIntList.copyOf(Iterables.concat(this, list)); } + /** Applies an operator to each element. */ + public ImmutableIntList map(IntUnaryOperator operator) { + final int[] integers = new int[ints.length]; + for (int i = 0; i < ints.length; i++) { + integers[i] = operator.applyAsInt(ints[i]); + } + return new ImmutableIntList(integers); + } + /** * Increments {@code offset} to each element of the list and * returns a new int list. */ + @Deprecated // to be removed before 2.0 public ImmutableIntList incr(int offset) { - final int[] integers = new int[ints.length]; - for (int i = 0; i < ints.length; i++) { - integers[i] = ints[i] + offset; - } - return new ImmutableIntList(integers); + return map(i -> i + offset); } /** Special sub-class of {@link ImmutableIntList} that is always @@ -331,6 +363,10 @@ private static class EmptyImmutableIntList extends ImmutableIntList { @Override public ListIterator listIterator() { return Collections.emptyList().listIterator(); } + + @Override public ImmutableIntList map(IntUnaryOperator operator) { + return this; + } } /** Extension to {@link com.google.common.collect.UnmodifiableListIterator} diff --git a/example/csv/src/main/java/org/apache/calcite/adapter/csv/CsvTranslatableTable.java b/example/csv/src/main/java/org/apache/calcite/adapter/csv/CsvTranslatableTable.java index 51cc683dc45..9fcea5ceb4f 100644 --- a/example/csv/src/main/java/org/apache/calcite/adapter/csv/CsvTranslatableTable.java +++ b/example/csv/src/main/java/org/apache/calcite/adapter/csv/CsvTranslatableTable.java @@ -34,6 +34,9 @@ import org.apache.calcite.schema.TranslatableTable; import org.apache.calcite.util.ImmutableIntList; import org.apache.calcite.util.Source; +import org.apache.calcite.util.Util; + +import com.google.common.primitives.Ints; import java.lang.reflect.Type; import java.util.concurrent.atomic.AtomicBoolean; @@ -88,7 +91,7 @@ public Enumerable project(final DataContext root, RelOptTable relOptTable) { // Request all fields. final int fieldCount = relOptTable.getRowType().getFieldCount(); - final int[] fields = CsvEnumerator.identityList(fieldCount); + final int[] fields = Ints.toArray(Util.range(fieldCount)); return new CsvTableScan(context.getCluster(), relOptTable, this, fields); } } diff --git a/file/src/main/java/org/apache/calcite/adapter/file/CsvTranslatableTable.java b/file/src/main/java/org/apache/calcite/adapter/file/CsvTranslatableTable.java index e927a586e07..71a158049b6 100644 --- a/file/src/main/java/org/apache/calcite/adapter/file/CsvTranslatableTable.java +++ b/file/src/main/java/org/apache/calcite/adapter/file/CsvTranslatableTable.java @@ -33,6 +33,9 @@ import org.apache.calcite.schema.TranslatableTable; import org.apache.calcite.util.ImmutableIntList; import org.apache.calcite.util.Source; +import org.apache.calcite.util.Util; + +import com.google.common.primitives.Ints; import java.lang.reflect.Type; import java.util.concurrent.atomic.AtomicBoolean; @@ -87,7 +90,7 @@ public Enumerable project(final DataContext root, RelOptTable relOptTable) { // Request all fields. final int fieldCount = relOptTable.getRowType().getFieldCount(); - final int[] fields = CsvEnumerator.identityList(fieldCount); + final int[] fields = Ints.toArray(Util.range(fieldCount)); return new CsvTableScan(context.getCluster(), relOptTable, this, fields); } } diff --git a/file/src/main/java/org/apache/calcite/adapter/file/FileEnumerator.java b/file/src/main/java/org/apache/calcite/adapter/file/FileEnumerator.java index 6b0ce467021..f32a2694937 100644 --- a/file/src/main/java/org/apache/calcite/adapter/file/FileEnumerator.java +++ b/file/src/main/java/org/apache/calcite/adapter/file/FileEnumerator.java @@ -17,6 +17,9 @@ package org.apache.calcite.adapter.file; import org.apache.calcite.linq4j.Enumerator; +import org.apache.calcite.util.Util; + +import com.google.common.primitives.Ints; import org.jsoup.select.Elements; @@ -33,7 +36,7 @@ class FileEnumerator implements Enumerator { private Object current; FileEnumerator(Iterator iterator, FileRowConverter converter) { - this(iterator, converter, identityList(converter.width())); + this(iterator, converter, Ints.toArray(Util.range(converter.width()))); } FileEnumerator(Iterator iterator, FileRowConverter converter, @@ -76,15 +79,4 @@ class FileEnumerator implements Enumerator { @Override public void close() { } - /** Returns an array of integers {0, ..., n - 1}. */ - private static int[] identityList(int n) { - int[] integers = new int[n]; - - for (int i = 0; i < n; i++) { - integers[i] = i; - } - - return integers; - } - } diff --git a/gradle.properties b/gradle.properties index 0e654e4ac9a..c0d50c977ff 100644 --- a/gradle.properties +++ b/gradle.properties @@ -83,6 +83,8 @@ jandex.version=2.2.3.Final # elasticsearch does not like asm:6.2.1+ aggdesigner-algorithm.version=6.0 apiguardian-api.version=1.1.2 +arrow-gandiva.version=15.0.0 +arrow.version=15.0.0 asm.version=7.2 byte-buddy.version=1.9.3 cassandra-all.version=4.0.1 diff --git a/kafka/src/test/java/org/apache/calcite/adapter/kafka/KafkaAdapterTest.java b/kafka/src/test/java/org/apache/calcite/adapter/kafka/KafkaAdapterTest.java index bb9fa46122e..16796630953 100644 --- a/kafka/src/test/java/org/apache/calcite/adapter/kafka/KafkaAdapterTest.java +++ b/kafka/src/test/java/org/apache/calcite/adapter/kafka/KafkaAdapterTest.java @@ -33,7 +33,7 @@ * Unit test cases for Kafka adapter. */ class KafkaAdapterTest { - protected static final URL MODEL = KafkaAdapterTest.class.getResource("/kafka.model.json"); + protected static final URL MODEL = KafkaAdapterTest.class.getResource("/kafka-model.json"); private CalciteAssert.AssertThat assertModel(String model) { // ensure that Schema from this instance is being used diff --git a/kafka/src/test/resources/kafka.model.json b/kafka/src/test/resources/kafka-model.json similarity index 100% rename from kafka/src/test/resources/kafka.model.json rename to kafka/src/test/resources/kafka-model.json diff --git a/settings.gradle.kts b/settings.gradle.kts index a5ea5367709..adb373f79b9 100644 --- a/settings.gradle.kts +++ b/settings.gradle.kts @@ -67,6 +67,7 @@ rootProject.name = "calcite" include( "bom", "release", + "arrow", "babel", "cassandra", "core", diff --git a/site/_docs/adapter.md b/site/_docs/adapter.md index 1ceff6518a0..72aa632adf7 100644 --- a/site/_docs/adapter.md +++ b/site/_docs/adapter.md @@ -27,6 +27,7 @@ limitations under the License. A schema adapter allows Calcite to read particular kind of data, presenting the data as tables within a schema. +* [Arrow adapter](arrow_adapter.html) (calcite-arrow) * [Cassandra adapter](cassandra_adapter.html) (calcite-cassandra) * CSV adapter (example/csv) * [Druid adapter](druid_adapter.html) (calcite-druid) @@ -36,6 +37,7 @@ presenting the data as tables within a schema. * [Geode adapter](geode_adapter.html) (calcite-geode) * [InnoDB adapter](innodb_adapter.html) (calcite-innodb) * JDBC adapter (part of calcite-core) +* [Kafka adapter](kafka_adapter.html) (calcite-kafka) * MongoDB adapter (calcite-mongodb) * [OS adapter](os_adapter.html) (calcite-os) * [Pig adapter](pig_adapter.html) (calcite-pig) @@ -44,7 +46,6 @@ presenting the data as tables within a schema. * Spark adapter (calcite-spark) * Splunk adapter (calcite-splunk) * Eclipse Memory Analyzer (MAT) adapter (mat-calcite-plugin) -* [Apache Kafka adapter](kafka_adapter.html) ### Other language interfaces diff --git a/site/_docs/arrow_adapter.md b/site/_docs/arrow_adapter.md new file mode 100644 index 00000000000..771be10e4bc --- /dev/null +++ b/site/_docs/arrow_adapter.md @@ -0,0 +1,89 @@ +--- +layout: docs +title: Arrow adapter +permalink: /docs/arrow_adapter.html +--- + + +**Note**: Arrow Adapter is an experimental feature; +changes in public API and usage are expected. + +## Overview + +Calcite's adapter for Apache Arrow is able to read and process data in Arrow +format. + +It can read files in Arrow's +[Feather format](https://wesmckinney.com/blog/feather-and-apache-arrow/) +(which generally have a `.arrow` suffix) in the same way that the +[File Adapter](file_adapter.html) can read `.csv` files. + +The File Adapter immediately translates data to +[Enumerable format]({{ site.apiRoot }}/org/apache/calcite/adapter/enumerable/EnumerableConvention.html) +for further processing, but Arrow is an extremely efficient format for +processing in-memory data, so the Arrow Adapter has implementations of other +relational operators for further processing. + +## A simple example + +Let's start with a simple example. First, we need a +[model definition]({{ site.baseurl }}/docs/model.html), +as follows. + +{% highlight json %} +{ + "version": "1.0", + "defaultSchema": "ARROW", + "schemas": [ + { + "name": "ARROW", + "type": "custom", + "factory": "org.apache.calcite.adapter.arrow.ArrowSchemaFactory", + "operand": { + "directory": "arrow" + } + } + ] +} +{% endhighlight %} + +The model file is stored as `arrow/src/test/resources/arrow-model.json`, +so you can connect via [`sqlline`](https://github.com/julianhyde/sqlline) +as follows: + +{% highlight bash %} +$ ./sqlline +sqlline> !connect jdbc:calcite:model=arrow/src/test/resources/arrow-model.json admin admin +sqlline> select * from arrow.test; ++----------+----------+------------+ +| fieldOne | fieldTwo | fieldThree | ++----------+----------+------------+ +| 1 | abc | 1.2 | +| 2 | def | 3.4 | +| 3 | xyz | 5.6 | +| 4 | abcd | 1.22 | +| 5 | defg | 3.45 | +| 6 | xyza | 5.67 | ++----------+----------+------------+ +6 rows selected +{% endhighlight %} + +The `arrow` directory contains a file called `test.arrow`, and so it shows up as +a table called `test`. diff --git a/site/_docs/kafka_adapter.md b/site/_docs/kafka_adapter.md index 23a019e6a32..114dbd9c264 100644 --- a/site/_docs/kafka_adapter.md +++ b/site/_docs/kafka_adapter.md @@ -22,11 +22,11 @@ limitations under the License. {% endcomment %} --> -**Note**: +**Note**: The Kafka Adapter is an experimental feature; +changes in public API and usage are expected. -KafkaAdapter is an experimental feature, changes in public API and usage are expected. - -For instructions on downloading and building Calcite, start with the[tutorial]({{ site.baseurl }}/docs/tutorial.html). +For instructions on downloading and building Calcite, start with the +[tutorial]({{ site.baseurl }}/docs/tutorial.html). The Kafka adapter exposes an Apache Kafka topic as a STREAM table, so it can be queried using [Calcite Stream SQL]({{ site.baseurl }}/docs/stream.html). Note that the adapter will not attempt to scan all topics, @@ -62,21 +62,21 @@ A basic example of a model file is given below: } {% endhighlight %} -Note that: - -1. As Kafka message is schemaless, a [KafkaRowConverter]({{ site.apiRoot }}/org/apache/calcite/adapter/kafka/KafkaRowConverter.html) - is required to specify row schema explicitly(with parameter `row.converter`), and - how to decode Kafka message to Calcite row. [KafkaRowConverterImpl]({{ site.apiRoot }}/org/apache/calcite/adapter/kafka/KafkaRowConverterImpl.html) - is used if not provided; +As Kafka messages are schemaless, +a [KafkaRowConverter]({{ site.apiRoot }}/org/apache/calcite/adapter/kafka/KafkaRowConverter.html) +is required to specify row schema explicitly (with parameter `row.converter`), +and how to decode Kafka message to Calcite row. +[KafkaRowConverterImpl]({{ site.apiRoot }}/org/apache/calcite/adapter/kafka/KafkaRowConverterImpl.html) +is used if not provided. -2. More consumer settings can be added in parameter `consumer.params`; +More consumer settings can be added in parameter `consumer.params`. -Assuming this file is stored as `kafka.model.json`, you can connect to Kafka via +Assuming this file is stored as `kafka-model.json`, you can connect to Kafka via [`sqlline`](https://github.com/julianhyde/sqlline) as follows: {% highlight bash %} $ ./sqlline -sqlline> !connect jdbc:calcite:model=kafka.model.json admin admin +sqlline> !connect jdbc:calcite:model=kafka-model.json admin admin {% endhighlight %} `sqlline` will now accept SQL queries which access your Kafka topics.