Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: support UUIDs to pyarrow on more backends #8901

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 2 additions & 4 deletions ibis/backends/flink/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
from ibis.backends.sql import SQLBackend
from ibis.backends.tests.errors import Py4JJavaError
from ibis.expr.operations.udf import InputType
from ibis.formats.pyarrow import PyArrowData

Check warning on line 27 in ibis/backends/flink/__init__.py

View check run for this annotation

Codecov / codecov/patch

ibis/backends/flink/__init__.py#L27

Added line #L27 was not covered by tests
from ibis.util import gen_name

if TYPE_CHECKING:
Expand Down Expand Up @@ -940,7 +941,6 @@
limit: int | str | None = None,
**kwargs: Any,
):
import pyarrow as pa
import pyarrow_hotfix # noqa: F401

ibis_table = expr.as_table()
Expand All @@ -963,9 +963,7 @@
# TODO (mehmet): `limit` is discarded in `execute()`. Is this intentional?
df = df.head(limit)

ibis_schema = ibis_table.schema()
arrow_schema = ibis_schema.to_pyarrow()
arrow_table = pa.Table.from_pandas(df, schema=arrow_schema)
arrow_table = PyArrowData.convert_table(df, ibis_table.schema())

Check warning on line 966 in ibis/backends/flink/__init__.py

View check run for this annotation

Codecov / codecov/patch

ibis/backends/flink/__init__.py#L966

Added line #L966 was not covered by tests
return arrow_table.to_reader()

def _from_ibis_table_to_pyflink_table(self, table: ir.Table) -> Table | None:
Expand Down
10 changes: 3 additions & 7 deletions ibis/backends/impala/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -1144,20 +1144,16 @@ def to_pyarrow(
limit: int | str | None = None,
**kwargs: Any,
) -> pa.Table:
import pyarrow as pa
import pyarrow_hotfix # noqa: F401

from ibis.formats.pyarrow import PyArrowData

self._run_pre_execute_hooks(expr)

table_expr = expr.as_table()
output = pa.Table.from_pandas(
self.execute(table_expr, params=params, limit=limit, **kwargs),
preserve_index=False,
)
table = PyArrowData.convert_table(output, table_expr.schema())
return expr.__pyarrow_result__(table)
df = self.execute(table_expr, params=params, limit=limit, **kwargs)
cpcloud marked this conversation as resolved.
Show resolved Hide resolved
pa_table = PyArrowData.convert_table(df, table_expr.schema())
return expr.__pyarrow_result__(pa_table)

def to_pyarrow_batches(
self,
Expand Down
10 changes: 3 additions & 7 deletions ibis/backends/mysql/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
from ibis.backends.mysql.compiler import MySQLCompiler
from ibis.backends.sql import SQLBackend
from ibis.backends.sql.compiler import TRUE, C
from ibis.formats.pyarrow import PyArrowData

if TYPE_CHECKING:
from collections.abc import Mapping
Expand Down Expand Up @@ -509,19 +510,14 @@ def to_pyarrow_batches(
chunk_size: int = 1_000_000,
**_: Any,
) -> pa.ipc.RecordBatchReader:
import pyarrow as pa

self._run_pre_execute_hooks(expr)

schema = expr.as_table().schema()
with self._safe_raw_sql(
self.compile(expr, limit=limit, params=params)
) as cursor:
df = self._fetch_from_cursor(cursor, schema)
table = pa.Table.from_pandas(
df, schema=schema.to_pyarrow(), preserve_index=False
)
return table.to_reader(max_chunksize=chunk_size)
pa_table = PyArrowData.convert_table(df, schema)
return pa_table.to_reader(max_chunksize=chunk_size)

def _fetch_from_cursor(self, cursor, schema: sch.Schema) -> pd.DataFrame:
import pandas as pd
Expand Down
9 changes: 3 additions & 6 deletions ibis/backends/sqlite/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
from ibis.backends.sqlite.compiler import SQLiteCompiler
from ibis.backends.sqlite.converter import SQLitePandasData
from ibis.backends.sqlite.udf import ignore_nulls, register_all
from ibis.formats.pyarrow import PyArrowData

if TYPE_CHECKING:
from collections.abc import Iterator, Mapping
Expand Down Expand Up @@ -269,19 +270,15 @@ def to_pyarrow_batches(
chunk_size: int = 1_000_000,
**_: Any,
) -> pa.ipc.RecordBatchReader:
import pyarrow as pa

self._run_pre_execute_hooks(expr)

schema = expr.as_table().schema()
with self._safe_raw_sql(
self.compile(expr, limit=limit, params=params)
) as cursor:
df = self._fetch_from_cursor(cursor, schema)
table = pa.Table.from_pandas(
df, schema=schema.to_pyarrow(), preserve_index=False
)
return table.to_reader(max_chunksize=chunk_size)
pa_table = PyArrowData.convert_table(df, schema)
return pa_table.to_reader(max_chunksize=chunk_size)

def _generate_create_table(self, table: sge.Table, schema: sch.Schema):
column_defs = [
Expand Down
7 changes: 6 additions & 1 deletion ibis/backends/tests/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -1556,10 +1556,15 @@ def test_close_connection(con):
reason="JSON type not implemented",
)
@pytest.mark.notimpl(
["risingwave", "sqlite"],
["risingwave"],
raises=pa.ArrowTypeError,
reason="mismatch between output value and expected input type",
)
@pytest.mark.notimpl(
["sqlite"],
raises=pa.ArrowInvalid,
reason="cannot mix list and non-list, non-null values",
)
@pytest.mark.never(
["snowflake"],
raises=TypeError,
Expand Down
15 changes: 15 additions & 0 deletions ibis/backends/tests/test_uuid.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import contextlib
import uuid

import pyarrow as pa
import pytest

import ibis
Expand Down Expand Up @@ -63,3 +64,17 @@ def test_uuid_unique_each_row(con):
con.tables.functional_alltypes.mutate(uuid=ibis.uuid()).limit(2).uuid.nunique()
)
assert expr.execute() == 2


@pytest.mark.notimpl(
["druid", "exasol", "oracle", "polars", "pyspark", "risingwave"],
raises=com.OperationNotDefinedError,
)
@pytest.mark.notimpl(
["clickhouse", "postgres", "trino"],
reason="Expected bytes, got a 'UUID' object. https://github.com/ibis-project/ibis/issues/8902",
raises=pa.lib.ArrowTypeError,
)
@pytest.mark.notimpl(["pandas", "dask"], raises=com.OperationNotDefinedError)
def test_uuid_to_pyarrow(con):
con.to_pyarrow(ibis.uuid())
20 changes: 18 additions & 2 deletions ibis/formats/pyarrow.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

from typing import TYPE_CHECKING, Any

import pandas as pd
import pyarrow as pa
import pyarrow_hotfix # noqa: F401

Expand Down Expand Up @@ -243,15 +244,30 @@ def convert_scalar(cls, scalar: pa.Scalar, dtype: dt.DataType) -> pa.Scalar:
return scalar

@classmethod
def convert_column(cls, column: pa.Array, dtype: dt.DataType) -> pa.Array:
def convert_column(
cls, column: pa.Array | pd.Series, dtype: dt.DataType
) -> pa.Array:
if isinstance(column, pd.Series):
if dtype.is_uuid():
NickCrews marked this conversation as resolved.
Show resolved Hide resolved
# pyarrow doesn't support UUIDs, so we need to convert them to strings
column = column.astype(str)
column = pa.Array.from_pandas(column)
desired_type = PyArrowType.from_ibis(dtype)
if column.type != desired_type:
return column.cast(desired_type)
else:
return column

@classmethod
def convert_table(cls, table: pa.Table, schema: Schema) -> pa.Table:
def convert_table(cls, table: pa.Table | pd.DataFrame, schema: Schema) -> pa.Table:
if isinstance(table, pd.DataFrame):
NickCrews marked this conversation as resolved.
Show resolved Hide resolved
table = pa.Table.from_arrays(
[
cls.convert_column(table[col], dtype)
for col, dtype in schema.items()
],
names=schema.names,
)
desired_schema = PyArrowSchema.from_ibis(schema)
pa_schema = table.schema

Expand Down
Loading