diff --git a/ibis/backends/flink/__init__.py b/ibis/backends/flink/__init__.py index e0b58e34cc15..be03bcc47787 100644 --- a/ibis/backends/flink/__init__.py +++ b/ibis/backends/flink/__init__.py @@ -24,6 +24,7 @@ from ibis.backends.sql import SQLBackend from ibis.backends.tests.errors import Py4JJavaError from ibis.expr.operations.udf import InputType +from ibis.formats.pyarrow import PyArrowData from ibis.util import gen_name if TYPE_CHECKING: @@ -940,7 +941,6 @@ def to_pyarrow_batches( limit: int | str | None = None, **kwargs: Any, ): - import pyarrow as pa import pyarrow_hotfix # noqa: F401 ibis_table = expr.as_table() @@ -963,9 +963,7 @@ def to_pyarrow_batches( # TODO (mehmet): `limit` is discarded in `execute()`. Is this intentional? df = df.head(limit) - ibis_schema = ibis_table.schema() - arrow_schema = ibis_schema.to_pyarrow() - arrow_table = pa.Table.from_pandas(df, schema=arrow_schema) + arrow_table = PyArrowData.convert_table(df, ibis_table.schema()) return arrow_table.to_reader() def _from_ibis_table_to_pyflink_table(self, table: ir.Table) -> Table | None: diff --git a/ibis/backends/impala/__init__.py b/ibis/backends/impala/__init__.py index e00276d57634..9e2ee27c8eb4 100644 --- a/ibis/backends/impala/__init__.py +++ b/ibis/backends/impala/__init__.py @@ -1144,7 +1144,6 @@ def to_pyarrow( limit: int | str | None = None, **kwargs: Any, ) -> pa.Table: - import pyarrow as pa import pyarrow_hotfix # noqa: F401 from ibis.formats.pyarrow import PyArrowData @@ -1152,12 +1151,9 @@ def to_pyarrow( self._run_pre_execute_hooks(expr) table_expr = expr.as_table() - output = pa.Table.from_pandas( - self.execute(table_expr, params=params, limit=limit, **kwargs), - preserve_index=False, - ) - table = PyArrowData.convert_table(output, table_expr.schema()) - return expr.__pyarrow_result__(table) + df = self.execute(table_expr, params=params, limit=limit, **kwargs) + pa_table = PyArrowData.convert_table(df, table_expr.schema()) + return expr.__pyarrow_result__(pa_table) def to_pyarrow_batches( self, diff --git a/ibis/backends/mysql/__init__.py b/ibis/backends/mysql/__init__.py index 6be88633b5f5..c9717265787f 100644 --- a/ibis/backends/mysql/__init__.py +++ b/ibis/backends/mysql/__init__.py @@ -26,6 +26,7 @@ from ibis.backends.mysql.compiler import MySQLCompiler from ibis.backends.sql import SQLBackend from ibis.backends.sql.compiler import TRUE, C +from ibis.formats.pyarrow import PyArrowData if TYPE_CHECKING: from collections.abc import Mapping @@ -509,19 +510,14 @@ def to_pyarrow_batches( chunk_size: int = 1_000_000, **_: Any, ) -> pa.ipc.RecordBatchReader: - import pyarrow as pa - self._run_pre_execute_hooks(expr) - schema = expr.as_table().schema() with self._safe_raw_sql( self.compile(expr, limit=limit, params=params) ) as cursor: df = self._fetch_from_cursor(cursor, schema) - table = pa.Table.from_pandas( - df, schema=schema.to_pyarrow(), preserve_index=False - ) - return table.to_reader(max_chunksize=chunk_size) + pa_table = PyArrowData.convert_table(df, schema) + return pa_table.to_reader(max_chunksize=chunk_size) def _fetch_from_cursor(self, cursor, schema: sch.Schema) -> pd.DataFrame: import pandas as pd diff --git a/ibis/backends/sqlite/__init__.py b/ibis/backends/sqlite/__init__.py index d3825dee05d1..9a66055d59d6 100644 --- a/ibis/backends/sqlite/__init__.py +++ b/ibis/backends/sqlite/__init__.py @@ -21,6 +21,7 @@ from ibis.backends.sqlite.compiler import SQLiteCompiler from ibis.backends.sqlite.converter import SQLitePandasData from ibis.backends.sqlite.udf import ignore_nulls, register_all +from ibis.formats.pyarrow import PyArrowData if TYPE_CHECKING: from collections.abc import Iterator, Mapping @@ -269,8 +270,6 @@ def to_pyarrow_batches( chunk_size: int = 1_000_000, **_: Any, ) -> pa.ipc.RecordBatchReader: - import pyarrow as pa - self._run_pre_execute_hooks(expr) schema = expr.as_table().schema() @@ -278,10 +277,8 @@ def to_pyarrow_batches( self.compile(expr, limit=limit, params=params) ) as cursor: df = self._fetch_from_cursor(cursor, schema) - table = pa.Table.from_pandas( - df, schema=schema.to_pyarrow(), preserve_index=False - ) - return table.to_reader(max_chunksize=chunk_size) + pa_table = PyArrowData.convert_table(df, schema) + return pa_table.to_reader(max_chunksize=chunk_size) def _generate_create_table(self, table: sge.Table, schema: sch.Schema): column_defs = [ diff --git a/ibis/backends/tests/test_client.py b/ibis/backends/tests/test_client.py index 0ec3edff5a36..3e8d82fd0bb3 100644 --- a/ibis/backends/tests/test_client.py +++ b/ibis/backends/tests/test_client.py @@ -1556,10 +1556,15 @@ def test_close_connection(con): reason="JSON type not implemented", ) @pytest.mark.notimpl( - ["risingwave", "sqlite"], + ["risingwave"], raises=pa.ArrowTypeError, reason="mismatch between output value and expected input type", ) +@pytest.mark.notimpl( + ["sqlite"], + raises=pa.ArrowInvalid, + reason="cannot mix list and non-list, non-null values", +) @pytest.mark.never( ["snowflake"], raises=TypeError, diff --git a/ibis/backends/tests/test_uuid.py b/ibis/backends/tests/test_uuid.py index 727f1e0db438..71b406cb6801 100644 --- a/ibis/backends/tests/test_uuid.py +++ b/ibis/backends/tests/test_uuid.py @@ -3,6 +3,7 @@ import contextlib import uuid +import pyarrow as pa import pytest import ibis @@ -63,3 +64,17 @@ def test_uuid_unique_each_row(con): con.tables.functional_alltypes.mutate(uuid=ibis.uuid()).limit(2).uuid.nunique() ) assert expr.execute() == 2 + + +@pytest.mark.notimpl( + ["druid", "exasol", "oracle", "polars", "pyspark", "risingwave"], + raises=com.OperationNotDefinedError, +) +@pytest.mark.notimpl( + ["clickhouse", "postgres", "trino"], + reason="Expected bytes, got a 'UUID' object. https://github.com/ibis-project/ibis/issues/8902", + raises=pa.lib.ArrowTypeError, +) +@pytest.mark.notimpl(["pandas", "dask"], raises=com.OperationNotDefinedError) +def test_uuid_to_pyarrow(con): + con.to_pyarrow(ibis.uuid()) diff --git a/ibis/formats/pyarrow.py b/ibis/formats/pyarrow.py index 4d1ffebc4b24..32875f89d63e 100644 --- a/ibis/formats/pyarrow.py +++ b/ibis/formats/pyarrow.py @@ -2,6 +2,7 @@ from typing import TYPE_CHECKING, Any +import pandas as pd import pyarrow as pa import pyarrow_hotfix # noqa: F401 @@ -243,7 +244,14 @@ def convert_scalar(cls, scalar: pa.Scalar, dtype: dt.DataType) -> pa.Scalar: return scalar @classmethod - def convert_column(cls, column: pa.Array, dtype: dt.DataType) -> pa.Array: + def convert_column( + cls, column: pa.Array | pd.Series, dtype: dt.DataType + ) -> pa.Array: + if isinstance(column, pd.Series): + if dtype.is_uuid(): + # pyarrow doesn't support UUIDs, so we need to convert them to strings + column = column.astype(str) + column = pa.Array.from_pandas(column) desired_type = PyArrowType.from_ibis(dtype) if column.type != desired_type: return column.cast(desired_type) @@ -251,7 +259,15 @@ def convert_column(cls, column: pa.Array, dtype: dt.DataType) -> pa.Array: return column @classmethod - def convert_table(cls, table: pa.Table, schema: Schema) -> pa.Table: + def convert_table(cls, table: pa.Table | pd.DataFrame, schema: Schema) -> pa.Table: + if isinstance(table, pd.DataFrame): + table = pa.Table.from_arrays( + [ + cls.convert_column(table[col], dtype) + for col, dtype in schema.items() + ], + names=schema.names, + ) desired_schema = PyArrowSchema.from_ibis(schema) pa_schema = table.schema