Skip to content

Commit b853a19

Browse files
committed
feat: support converting UUIDs to pyarrow
1 parent 8e5cc3b commit b853a19

File tree

7 files changed

+50
-26
lines changed

7 files changed

+50
-26
lines changed

ibis/backends/flink/__init__.py

+2-3
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
from ibis.backends.sql import SQLBackend
2525
from ibis.backends.tests.errors import Py4JJavaError
2626
from ibis.expr.operations.udf import InputType
27+
from ibis.formats.pyarrow import PyArrowData
2728
from ibis.util import gen_name
2829

2930
if TYPE_CHECKING:
@@ -963,9 +964,7 @@ def to_pyarrow_batches(
963964
# TODO (mehmet): `limit` is discarded in `execute()`. Is this intentional?
964965
df = df.head(limit)
965966

966-
ibis_schema = ibis_table.schema()
967-
arrow_schema = ibis_schema.to_pyarrow()
968-
arrow_table = pa.Table.from_pandas(df, schema=arrow_schema)
967+
arrow_table = PyArrowData.convert_table(df, ibis_table.schema())
969968
return arrow_table.to_reader()
970969

971970
def _from_ibis_table_to_pyflink_table(self, table: ir.Table) -> Table | None:

ibis/backends/impala/__init__.py

+3-7
Original file line numberDiff line numberDiff line change
@@ -1144,20 +1144,16 @@ def to_pyarrow(
11441144
limit: int | str | None = None,
11451145
**kwargs: Any,
11461146
) -> pa.Table:
1147-
import pyarrow as pa
11481147
import pyarrow_hotfix # noqa: F401
11491148

11501149
from ibis.formats.pyarrow import PyArrowData
11511150

11521151
self._run_pre_execute_hooks(expr)
11531152

11541153
table_expr = expr.as_table()
1155-
output = pa.Table.from_pandas(
1156-
self.execute(table_expr, params=params, limit=limit, **kwargs),
1157-
preserve_index=False,
1158-
)
1159-
table = PyArrowData.convert_table(output, table_expr.schema())
1160-
return expr.__pyarrow_result__(table)
1154+
df = self.execute(table_expr, params=params, limit=limit, **kwargs)
1155+
pa_table = PyArrowData.convert_table(df, table_expr.schema())
1156+
return expr.__pyarrow_result__(pa_table)
11611157

11621158
def to_pyarrow_batches(
11631159
self,

ibis/backends/mysql/__init__.py

+3-7
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
from ibis.backends.mysql.compiler import MySQLCompiler
2727
from ibis.backends.sql import SQLBackend
2828
from ibis.backends.sql.compiler import TRUE, C
29+
from ibis.formats.pyarrow import PyArrowData
2930

3031
if TYPE_CHECKING:
3132
from collections.abc import Mapping
@@ -509,19 +510,14 @@ def to_pyarrow_batches(
509510
chunk_size: int = 1_000_000,
510511
**_: Any,
511512
) -> pa.ipc.RecordBatchReader:
512-
import pyarrow as pa
513-
514513
self._run_pre_execute_hooks(expr)
515-
516514
schema = expr.as_table().schema()
517515
with self._safe_raw_sql(
518516
self.compile(expr, limit=limit, params=params)
519517
) as cursor:
520518
df = self._fetch_from_cursor(cursor, schema)
521-
table = pa.Table.from_pandas(
522-
df, schema=schema.to_pyarrow(), preserve_index=False
523-
)
524-
return table.to_reader(max_chunksize=chunk_size)
519+
pa_table = PyArrowData.convert_table(df, schema)
520+
return pa_table.to_reader(max_chunksize=chunk_size)
525521

526522
def _fetch_from_cursor(self, cursor, schema: sch.Schema) -> pd.DataFrame:
527523
import pandas as pd

ibis/backends/sqlite/__init__.py

+3-6
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
from ibis.backends.sqlite.compiler import SQLiteCompiler
2222
from ibis.backends.sqlite.converter import SQLitePandasData
2323
from ibis.backends.sqlite.udf import ignore_nulls, register_all
24+
from ibis.formats.pyarrow import PyArrowData
2425

2526
if TYPE_CHECKING:
2627
from collections.abc import Iterator, Mapping
@@ -269,19 +270,15 @@ def to_pyarrow_batches(
269270
chunk_size: int = 1_000_000,
270271
**_: Any,
271272
) -> pa.ipc.RecordBatchReader:
272-
import pyarrow as pa
273-
274273
self._run_pre_execute_hooks(expr)
275274

276275
schema = expr.as_table().schema()
277276
with self._safe_raw_sql(
278277
self.compile(expr, limit=limit, params=params)
279278
) as cursor:
280279
df = self._fetch_from_cursor(cursor, schema)
281-
table = pa.Table.from_pandas(
282-
df, schema=schema.to_pyarrow(), preserve_index=False
283-
)
284-
return table.to_reader(max_chunksize=chunk_size)
280+
pa_table = PyArrowData.convert_table(df, schema)
281+
return pa_table.to_reader(max_chunksize=chunk_size)
285282

286283
def _generate_create_table(self, table: sge.Table, schema: sch.Schema):
287284
column_defs = [

ibis/backends/tests/test_client.py

+6-1
Original file line numberDiff line numberDiff line change
@@ -1556,10 +1556,15 @@ def test_close_connection(con):
15561556
reason="JSON type not implemented",
15571557
)
15581558
@pytest.mark.notimpl(
1559-
["risingwave", "sqlite"],
1559+
["risingwave"],
15601560
raises=pa.ArrowTypeError,
15611561
reason="mismatch between output value and expected input type",
15621562
)
1563+
@pytest.mark.notimpl(
1564+
["sqlite"],
1565+
raises=pa.ArrowInvalid,
1566+
reason="cannot mix list and non-list, non-null values",
1567+
)
15631568
@pytest.mark.never(
15641569
["snowflake"],
15651570
raises=TypeError,

ibis/backends/tests/test_uuid.py

+15
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import contextlib
44
import uuid
55

6+
import pyarrow as pa
67
import pytest
78

89
import ibis
@@ -63,3 +64,17 @@ def test_uuid_unique_each_row(con):
6364
con.tables.functional_alltypes.mutate(uuid=ibis.uuid()).limit(2).uuid.nunique()
6465
)
6566
assert expr.execute() == 2
67+
68+
69+
@pytest.mark.notimpl(
70+
["druid", "exasol", "oracle", "polars", "pyspark", "risingwave"],
71+
raises=com.OperationNotDefinedError,
72+
)
73+
@pytest.mark.notimpl(
74+
["clickhouse", "postgres", "trino"],
75+
reason="Expected bytes, got a 'UUID' object. https://github.com/ibis-project/ibis/issues/8902",
76+
raises=pa.lib.ArrowTypeError,
77+
)
78+
@pytest.mark.notimpl(["pandas", "dask"], raises=com.OperationNotDefinedError)
79+
def test_uuid_to_pyarrow(con):
80+
con.to_pyarrow(ibis.uuid())

ibis/formats/pyarrow.py

+18-2
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
from typing import TYPE_CHECKING, Any
44

5+
import pandas as pd
56
import pyarrow as pa
67
import pyarrow_hotfix # noqa: F401
78

@@ -243,15 +244,30 @@ def convert_scalar(cls, scalar: pa.Scalar, dtype: dt.DataType) -> pa.Scalar:
243244
return scalar
244245

245246
@classmethod
246-
def convert_column(cls, column: pa.Array, dtype: dt.DataType) -> pa.Array:
247+
def convert_column(
248+
cls, column: pa.Array | pd.Series, dtype: dt.DataType
249+
) -> pa.Array:
250+
if isinstance(column, pd.Series):
251+
if dtype.is_uuid():
252+
# pyarrow doesn't support UUIDs, so we need to convert them to strings
253+
column = column.astype(str)
254+
column = pa.Array.from_pandas(column)
247255
desired_type = PyArrowType.from_ibis(dtype)
248256
if column.type != desired_type:
249257
return column.cast(desired_type)
250258
else:
251259
return column
252260

253261
@classmethod
254-
def convert_table(cls, table: pa.Table, schema: Schema) -> pa.Table:
262+
def convert_table(cls, table: pa.Table | pd.DataFrame, schema: Schema) -> pa.Table:
263+
if isinstance(table, pd.DataFrame):
264+
table = pa.Table.from_arrays(
265+
[
266+
cls.convert_column(table[col], dtype)
267+
for col, dtype in schema.items()
268+
],
269+
names=schema.names,
270+
)
255271
desired_schema = PyArrowSchema.from_ibis(schema)
256272
pa_schema = table.schema
257273

0 commit comments

Comments
 (0)