From c50e955a9b4ffa88cb4ea4ba690f3b7e1e8a33a9 Mon Sep 17 00:00:00 2001 From: Alessandro Solimando Date: Sun, 25 Feb 2024 20:13:20 +0100 Subject: [PATCH] splitting ArrowEnumerator into sub-classes --- .../arrow/AbstractArrowEnumerator.java | 81 ++++++++ .../adapter/arrow/ArrowEnumerable.java | 15 +- .../adapter/arrow/ArrowEnumerator.java | 184 ------------------ .../adapter/arrow/ArrowFilterEnumerator.java | 97 +++++++++ .../adapter/arrow/ArrowProjectEnumerator.java | 76 ++++++++ .../calcite/adapter/arrow/ArrowTable.java | 4 +- 6 files changed, 266 insertions(+), 191 deletions(-) create mode 100644 arrow/src/main/java/org/apache/calcite/adapter/arrow/AbstractArrowEnumerator.java delete mode 100644 arrow/src/main/java/org/apache/calcite/adapter/arrow/ArrowEnumerator.java create mode 100644 arrow/src/main/java/org/apache/calcite/adapter/arrow/ArrowFilterEnumerator.java create mode 100644 arrow/src/main/java/org/apache/calcite/adapter/arrow/ArrowProjectEnumerator.java diff --git a/arrow/src/main/java/org/apache/calcite/adapter/arrow/AbstractArrowEnumerator.java b/arrow/src/main/java/org/apache/calcite/adapter/arrow/AbstractArrowEnumerator.java new file mode 100644 index 00000000000..5faf4e895b5 --- /dev/null +++ b/arrow/src/main/java/org/apache/calcite/adapter/arrow/AbstractArrowEnumerator.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.calcite.adapter.arrow; + +import org.apache.calcite.linq4j.Enumerator; +import org.apache.calcite.util.ImmutableIntList; +import org.apache.calcite.util.Util; + +import org.apache.arrow.vector.ValueVector; +import org.apache.arrow.vector.VectorSchemaRoot; +import org.apache.arrow.vector.VectorUnloader; +import org.apache.arrow.vector.ipc.ArrowFileReader; +import org.apache.arrow.vector.ipc.message.ArrowRecordBatch; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +/** + * Enumerator that reads from a collection of Arrow value-vectors. + */ +abstract class AbstractArrowEnumerator implements Enumerator { + protected final ArrowFileReader arrowFileReader; + protected int currRowIndex = -1; + protected int rowCount; + protected final List valueVectors; + protected final List fields; + + AbstractArrowEnumerator(ArrowFileReader arrowFileReader, ImmutableIntList fields) { + this.arrowFileReader = arrowFileReader; + this.fields = fields; + this.valueVectors = new ArrayList<>(fields.size()); + } + + abstract void evaluateOperator(ArrowRecordBatch arrowRecordBatch); + + protected void loadNextArrowBatch() { + try { + final VectorSchemaRoot vsr = arrowFileReader.getVectorSchemaRoot(); + for (int i : fields) { + this.valueVectors.add(vsr.getVector(i)); + } + this.rowCount = vsr.getRowCount(); + VectorUnloader vectorUnloader = new VectorUnloader(vsr); + ArrowRecordBatch arrowRecordBatch = vectorUnloader.getRecordBatch(); + evaluateOperator(arrowRecordBatch); + } catch (IOException e) { + throw Util.toUnchecked(e); + } + } + + @Override public Object current() { + if (fields.size() == 1) { + return this.valueVectors.get(0).getObject(currRowIndex); + } + Object[] current = new Object[valueVectors.size()]; + for (int i = 0; i < valueVectors.size(); i++) { + ValueVector vector = this.valueVectors.get(i); + current[i] = vector.getObject(currRowIndex); + } + return current; + } + + @Override public void reset() { + throw new UnsupportedOperationException(); + } +} diff --git a/arrow/src/main/java/org/apache/calcite/adapter/arrow/ArrowEnumerable.java b/arrow/src/main/java/org/apache/calcite/adapter/arrow/ArrowEnumerable.java index d054ba8a4d8..7f598f3abe2 100644 --- a/arrow/src/main/java/org/apache/calcite/adapter/arrow/ArrowEnumerable.java +++ b/arrow/src/main/java/org/apache/calcite/adapter/arrow/ArrowEnumerable.java @@ -32,13 +32,12 @@ */ class ArrowEnumerable extends AbstractEnumerable { private final ArrowFileReader arrowFileReader; + private final ImmutableIntList fields; private final @Nullable Projector projector; private final @Nullable Filter filter; - private final ImmutableIntList fields; - ArrowEnumerable(ArrowFileReader arrowFileReader, - @Nullable Projector projector, @Nullable Filter filter, - ImmutableIntList fields) { + ArrowEnumerable(ArrowFileReader arrowFileReader, ImmutableIntList fields, + @Nullable Projector projector, @Nullable Filter filter) { this.arrowFileReader = arrowFileReader; this.projector = projector; this.filter = filter; @@ -47,7 +46,13 @@ class ArrowEnumerable extends AbstractEnumerable { @Override public Enumerator enumerator() { try { - return new ArrowEnumerator(projector, filter, fields, arrowFileReader); + if (projector != null) { + return new ArrowProjectEnumerator(arrowFileReader, fields, projector); + } else if (filter != null) { + return new ArrowFilterEnumerator(arrowFileReader, fields, filter); + } + throw new IllegalArgumentException( + "The arrow enumerator must have either a filter or a projection"); } catch (Exception e) { throw Util.toUnchecked(e); } diff --git a/arrow/src/main/java/org/apache/calcite/adapter/arrow/ArrowEnumerator.java b/arrow/src/main/java/org/apache/calcite/adapter/arrow/ArrowEnumerator.java deleted file mode 100644 index d8f4670e1bb..00000000000 --- a/arrow/src/main/java/org/apache/calcite/adapter/arrow/ArrowEnumerator.java +++ /dev/null @@ -1,184 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to you under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.calcite.adapter.arrow; - -import org.apache.calcite.linq4j.Enumerator; -import org.apache.calcite.util.ImmutableIntList; -import org.apache.calcite.util.Util; - -import org.apache.arrow.gandiva.evaluator.Filter; -import org.apache.arrow.gandiva.evaluator.Projector; -import org.apache.arrow.gandiva.evaluator.SelectionVector; -import org.apache.arrow.gandiva.evaluator.SelectionVectorInt16; -import org.apache.arrow.gandiva.exceptions.GandivaException; -import org.apache.arrow.memory.ArrowBuf; -import org.apache.arrow.memory.BufferAllocator; -import org.apache.arrow.memory.RootAllocator; -import org.apache.arrow.vector.ValueVector; -import org.apache.arrow.vector.VectorSchemaRoot; -import org.apache.arrow.vector.VectorUnloader; -import org.apache.arrow.vector.ipc.ArrowFileReader; -import org.apache.arrow.vector.ipc.message.ArrowRecordBatch; - -import org.checkerframework.checker.nullness.qual.Nullable; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.List; - -/** - * Enumerator that reads from a collection of Arrow value-vectors. - */ -class ArrowEnumerator implements Enumerator { - private final BufferAllocator allocator; - private final ArrowFileReader arrowFileReader; - private final @Nullable Projector projector; - private final @Nullable Filter filter; - private int rowIndex = -1; - private final List valueVectors; - private final List fields; - private int rowSize; - private @Nullable ArrowBuf buf; - private @Nullable SelectionVector selectionVector; - private int selectionVectorIndex = 0; - - ArrowEnumerator(@Nullable Projector projector, @Nullable Filter filter, - ImmutableIntList fields, ArrowFileReader arrowFileReader) { - this.allocator = new RootAllocator(Long.MAX_VALUE); - this.projector = projector; - this.filter = filter; - this.arrowFileReader = arrowFileReader; - this.fields = fields; - this.valueVectors = new ArrayList<>(fields.size()); - - // Set up fields so that first call to moveNext() will trigger a call to - // loadNextBatch(). - if (projector != null) { - rowIndex = rowSize = 0; - } else { - selectionVector = null; - selectionVectorIndex = 0; - } - } - - @Override public Object current() { - if (fields.size() == 1) { - return this.valueVectors.get(0).getObject(rowIndex); - } - Object[] current = new Object[valueVectors.size()]; - for (int i = 0; i < valueVectors.size(); i++) { - ValueVector vector = this.valueVectors.get(i); - current[i] = vector.getObject(rowIndex); - } - return current; - } - - @Override public boolean moveNext() { - if (projector != null) { - if (rowIndex >= rowSize - 1) { - final boolean hasNextBatch; - try { - hasNextBatch = arrowFileReader.loadNextBatch(); - } catch (IOException e) { - throw Util.toUnchecked(e); - } - if (hasNextBatch) { - rowIndex = 0; - this.valueVectors.clear(); - loadNextArrowBatch(); - return true; - } else { - return false; - } - } else { - rowIndex++; - return true; - } - } else { - if (selectionVector == null - || selectionVectorIndex >= selectionVector.getRecordCount()) { - boolean hasNextBatch; - while (true) { - try { - hasNextBatch = arrowFileReader.loadNextBatch(); - } catch (IOException e) { - throw Util.toUnchecked(e); - } - if (hasNextBatch) { - selectionVectorIndex = 0; - this.valueVectors.clear(); - loadNextArrowBatch(); - assert selectionVector != null; - if (selectionVectorIndex >= selectionVector.getRecordCount()) { - // the "filtered" batch is empty, but there may be more batches to fetch - continue; - } - rowIndex = selectionVector.getIndex(selectionVectorIndex++); - return true; - } else { - return false; - } - } - } else { - rowIndex = selectionVector.getIndex(selectionVectorIndex++); - return true; - } - } - } - - private void loadNextArrowBatch() { - try { - final VectorSchemaRoot vsr = arrowFileReader.getVectorSchemaRoot(); - for (int i : fields) { - this.valueVectors.add(vsr.getVector(i)); - } - this.rowSize = vsr.getRowCount(); - VectorUnloader vectorUnloader = new VectorUnloader(vsr); - ArrowRecordBatch arrowRecordBatch = vectorUnloader.getRecordBatch(); - if (projector != null) { - projector.evaluate(arrowRecordBatch, valueVectors); - } - if (filter != null) { - this.buf = this.allocator.buffer(rowSize * 2); - this.selectionVector = new SelectionVectorInt16(buf); - filter.evaluate(arrowRecordBatch, selectionVector); - } - } catch (IOException | GandivaException e) { - throw Util.toUnchecked(e); - } - } - - @Override public void reset() { - throw new UnsupportedOperationException(); - } - - @Override public void close() { - try { - if (projector != null) { - projector.close(); - } - if (filter != null) { - if (buf != null) { - buf.close(); - } - filter.close(); - } - } catch (GandivaException e) { - throw Util.toUnchecked(e); - } - } -} diff --git a/arrow/src/main/java/org/apache/calcite/adapter/arrow/ArrowFilterEnumerator.java b/arrow/src/main/java/org/apache/calcite/adapter/arrow/ArrowFilterEnumerator.java new file mode 100644 index 00000000000..326c3086d01 --- /dev/null +++ b/arrow/src/main/java/org/apache/calcite/adapter/arrow/ArrowFilterEnumerator.java @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.calcite.adapter.arrow; + +import org.apache.arrow.gandiva.evaluator.Filter; +import org.apache.arrow.gandiva.evaluator.SelectionVector; +import org.apache.arrow.gandiva.evaluator.SelectionVectorInt16; +import org.apache.arrow.gandiva.exceptions.GandivaException; +import org.apache.arrow.memory.ArrowBuf; +import org.apache.arrow.memory.BufferAllocator; +import org.apache.arrow.memory.RootAllocator; +import org.apache.arrow.vector.ipc.ArrowFileReader; +import org.apache.arrow.vector.ipc.message.ArrowRecordBatch; + +import org.apache.calcite.util.ImmutableIntList; +import org.apache.calcite.util.Util; + +import java.io.IOException; + +/** + * Enumerator that reads from a filtered collection of Arrow value-vectors. + */ +class ArrowFilterEnumerator extends AbstractArrowEnumerator { + private final BufferAllocator allocator; + private final Filter filter; + private ArrowBuf buf; + private SelectionVector selectionVector; + private int selectionVectorIndex; + + ArrowFilterEnumerator(ArrowFileReader arrowFileReader, ImmutableIntList fields, Filter filter) { + super(arrowFileReader, fields); + this.allocator = new RootAllocator(Long.MAX_VALUE); + this.filter = filter; + } + + void evaluateOperator(ArrowRecordBatch arrowRecordBatch) { + try { + this.buf = this.allocator.buffer((long) rowCount * 2); + this.selectionVector = new SelectionVectorInt16(buf); + filter.evaluate(arrowRecordBatch, selectionVector); + } catch (GandivaException e) { + throw Util.toUnchecked(e); + } + } + + @Override public boolean moveNext() { + if (selectionVector == null + || selectionVectorIndex >= selectionVector.getRecordCount()) { + boolean hasNextBatch; + while (true) { + try { + hasNextBatch = arrowFileReader.loadNextBatch(); + } catch (IOException e) { + throw Util.toUnchecked(e); + } + if (hasNextBatch) { + selectionVectorIndex = 0; + this.valueVectors.clear(); + loadNextArrowBatch(); + assert selectionVector != null; + if (selectionVectorIndex >= selectionVector.getRecordCount()) { + // the "filtered" batch is empty, but there may be more batches to fetch + continue; + } + currRowIndex = selectionVector.getIndex(selectionVectorIndex++); + } + return hasNextBatch; + } + } else { + currRowIndex = selectionVector.getIndex(selectionVectorIndex++); + return true; + } + } + + @Override public void close() { + try { + buf.close(); + filter.close(); + } catch (GandivaException e) { + throw Util.toUnchecked(e); + } + } +} diff --git a/arrow/src/main/java/org/apache/calcite/adapter/arrow/ArrowProjectEnumerator.java b/arrow/src/main/java/org/apache/calcite/adapter/arrow/ArrowProjectEnumerator.java new file mode 100644 index 00000000000..71383f7af03 --- /dev/null +++ b/arrow/src/main/java/org/apache/calcite/adapter/arrow/ArrowProjectEnumerator.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.calcite.adapter.arrow; + +import org.apache.calcite.util.ImmutableIntList; +import org.apache.calcite.util.Util; + +import org.apache.arrow.gandiva.evaluator.Projector; +import org.apache.arrow.gandiva.exceptions.GandivaException; +import org.apache.arrow.vector.ipc.ArrowFileReader; +import org.apache.arrow.vector.ipc.message.ArrowRecordBatch; + +import java.io.IOException; + +/** + * Enumerator that reads from a projected collection of Arrow value-vectors. + */ +class ArrowProjectEnumerator extends AbstractArrowEnumerator { + private final Projector projector; + + ArrowProjectEnumerator(ArrowFileReader arrowFileReader, ImmutableIntList fields, + Projector projector) { + super(arrowFileReader, fields); + this.projector = projector; + } + + protected void evaluateOperator(ArrowRecordBatch arrowRecordBatch) { + try { + projector.evaluate(arrowRecordBatch, valueVectors); + } catch (GandivaException e) { + throw Util.toUnchecked(e); + } + } + + @Override public boolean moveNext() { + if (currRowIndex >= rowCount - 1) { + final boolean hasNextBatch; + try { + hasNextBatch = arrowFileReader.loadNextBatch(); + } catch (IOException e) { + throw Util.toUnchecked(e); + } + if (hasNextBatch) { + currRowIndex = 0; + this.valueVectors.clear(); + loadNextArrowBatch(); + } + return hasNextBatch; + } else { + currRowIndex++; + return true; + } + } + + @Override public void close() { + try { + projector.close(); + } catch (GandivaException e) { + throw Util.toUnchecked(e); + } + } +} diff --git a/arrow/src/main/java/org/apache/calcite/adapter/arrow/ArrowTable.java b/arrow/src/main/java/org/apache/calcite/adapter/arrow/ArrowTable.java index f0931153fed..1f50c94128e 100644 --- a/arrow/src/main/java/org/apache/calcite/adapter/arrow/ArrowTable.java +++ b/arrow/src/main/java/org/apache/calcite/adapter/arrow/ArrowTable.java @@ -100,7 +100,7 @@ public Enumerable query(DataContext root, ImmutableIntList fields, final Projector projector; final Filter filter; - if (conditions.size() == 0) { + if (conditions.isEmpty()) { filter = null; final List expressionTrees = new ArrayList<>(); @@ -144,7 +144,7 @@ public Enumerable query(DataContext root, ImmutableIntList fields, } } - return new ArrowEnumerable(arrowFileReader, projector, filter, fields); + return new ArrowEnumerable(arrowFileReader, fields, projector, filter); } @Override public Queryable asQueryable(QueryProvider queryProvider,