Skip to content

Conversation

@devin-petersohn
Copy link
Contributor

What changes were proposed in this pull request?

Add support for Pycapsule and __dataframe__ protocols for interchange between Spark and other Python libraries. Here is a demo of what this enables with Polars and DuckDB:

Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /__ / .__/\_,_/_/ /_/\_\   version 4.2.0-SNAPSHOT
      /_/

Using Python version 3.11.5 (main, Sep 11 2023 08:31:25)
Spark context Web UI available at http://192.168.86.83:4040
Spark context available as 'sc' (master = local[*], app id = local-1765227291836).
SparkSession available as 'spark'.

In [1]: import pyspark.pandas as ps
   ...: import pandas as pd
   ...: import numpy as np
   ...: import polars as pl
   ...: 
   ...: pdf = pd.DataFrame(
   ...:     {"A": [True, False], "B": [1, np.nan], "C": [True, None], "D": [None, np.nan]}
   ...: )
   ...: psdf = ps.from_pandas(pdf)
   ...: polars_df = pl.DataFrame(psdf)
/Users/dpetersohn/software_sources/spark/python/pyspark/pandas/__init__.py:43: UserWarning: 'PYARROW_IGNORE_TIMEZONE' environment variable was not set. It is required to set this environment variable to '1' in both driver and executor sides if you use pyarrow>=2.0.0. pandas-on-Spark will set it for you but it does not work if there is a Spark context already launched.
  warnings.warn(
[Stage 0:>                                                          (0 + 1) / 1]
In [2]: polars_df
Out[2]: 
shape: (2, 5)
┌───────────────────┬───────┬──────┬──────┬──────┐
│ __index_level_0__ ┆ A     ┆ B    ┆ C    ┆ D    │
│ ---               ┆ ---   ┆ ---  ┆ ---  ┆ ---  │
│ i64               ┆ bool  ┆ f64  ┆ bool ┆ f64  │
╞═══════════════════╪═══════╪══════╪══════╪══════╡
│ 0                 ┆ true  ┆ 1.0  ┆ true ┆ null │
│ 1                 ┆ false ┆ null ┆ null ┆ null │
└───────────────────┴───────┴──────┴──────┴──────┘

In [3]: import duckdb

In [4]: import pyarrow as pa

In [5]: stream = pa.RecordBatchReader.from_stream(psdf)

In [6]: duckdb.sql("SELECT count(*) AS total, avg(B) FROM stream WHERE B IS NOT NULL").fetchall()
Out[6]: [(1, 1.0)]

Polars will now be able to consume a full Pyspark dataframe (or pyspark.pandas), and DuckDB can consume a stream built from the Pyspark dataframe. Importantly, the stream = pa.RecordBatchReader.from_stream(psdf) line does not trigger any computation, it simply creates a stream object which is incrementally consumed by DuckDB when the fetchall call is executed.

Why are the changes needed?

Currently, Pyspark (and to a lesser degree Pyspark pandas) does not integrate well with the broader Python ecosystem. Currently, the best practice is to go through pandas with toPandas, but that materializes all data on the driver all at once. This new API and protocol allows data to stream, one Arrow Batch at a time, enabling libraries like DuckDB and Polars to consume the data as a stream.

Does this PR introduce any user-facing change?

Yes, new user-level API.

How was this patch tested?

Locally

Was this patch authored or co-authored using generative AI tooling?

No

Signed-off-by: Devin Petersohn <[email protected]>
Co-authored-by: Devin Petersohn <[email protected]>
Signed-off-by: Devin Petersohn <[email protected]>
Signed-off-by: Devin Petersohn <[email protected]>
Copy link
Contributor

@allisonwang-db allisonwang-db left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Signed-off-by: Devin Petersohn <[email protected]>
@gaogaotiantian
Copy link
Contributor

Okay I think the title is a bit misleading - the PR is not about PyCapsule (which is a CPython concept to pass raw pointers around), it's about implementing __dataframe__ and __arrow_c_stream__ for spark dataframe so it can be directly converted to other dataframes.

Also I believe __dataframe__ and __arrow_c_stream__ are two different and almost irrelevant protocols that we can treat differently.

Before digging into the code details too much, I think we need to discuss about the general direction.

If we are going to implement either protocol, we probably want to do it right. The current implementation basically ignores a lot of the required methods and implemented the minimum amount of methods to make the from_dataframe work - we can start with that but we need the commitment to make it correct eventually.

Then about arrow. Do we want to rely on undocumented arrow classes for spark to work? Both _PyArrowColumn and _PyArrowDataFrame are undocumented (not very active though) and subject to change anytime. And even take an extra step back - do we even want pyarrow to be a hard dependency? I think there was discussions about it when turning on arrow_by_default but it can fallback to the old implementation. What about this?

If I understand the protocol correctly, you don't require arrow at all to implement __dataframe__ protocol - maybe that's a direction we should consider. Eventually, maybe we still need some underlying library to keep the buffer, but the current implementation seems a bit unnecessary to me (I guess it's a proof of concept?).

Anyway this is a very interesting feature, but also a big commitment. Users will expect this to work properly if we do this, so I think we need to give it more thoughts before going forward.

@devin-petersohn
Copy link
Contributor Author

Thanks for the comment @gaogaotiantian. From my understanding, __arrow_c_stream__ is the interface for exporting PyCapsule objects (which this PR does) - https://arrow.apache.org/docs/format/CDataInterface/PyCapsuleInterface.html. Please feel free to suggest and alternative title, I am not tied to any title, as long as it is accurate.

I am okay removing __dataframe__ from this PR altogether, since that seems to be where the concerns are. In fact, __dataframe__ is already legacy and on its way toward being deprecated in pandas and many other libraries, but a lot of libraries still rely on it. Moving forward, projects are expected to conform to the PyCapsule interface (pandas-dev/pandas@865ae1d).

What do you think about removing __dataframe__ from this PR and focusing on the PyCapsule interface @gaogaotiantian?

@gaogaotiantian
Copy link
Contributor

So I'm not that familiar with the standard itself - I don't see __dataframe__ standard is being deprecated. pandas is deprecating it yes. However, in any case, I think it is helpful to separate the two implementations. They do not rely on each other and each of them requires plenty of discussion.

From the documentation on Arrow, the Arrow PyCapsule Interface is still experimental. I'm not sure what's the policy for features at such stage.

For the implemtation itself, _get_arrow_array_partition_stream a bit difficult to follow. It seems like the function is transforming a pyarrow.RecordBatch to a pyarrow.RecordBatch. I don't quite understand why it needs to convert the data in different formats then yield the same type as the input. Maybe it's because my lack of knowledge of arrow.

@devin-petersohn
Copy link
Contributor Author

For the implemtation itself, _get_arrow_array_partition_stream a bit difficult to follow. It seems like the function is transforming a pyarrow.RecordBatch to a pyarrow.RecordBatch. I don't quite understand why it needs to convert the data in different formats then yield the same type as the input.

This is a requirement of Pyspark mapInArrow (https://github.com/apache/spark/pull/53391/files#diff-464b2c763de6d4a4d1bc63e7bd5816e786ad5e501ddcceb5df96408c72351cb3R36).

@devin-petersohn
Copy link
Contributor Author

However, in any case, I think it is helpful to separate the two implementations.

Agreed, I will remove __dataframe__ from this PR.

@gaogaotiantian
Copy link
Contributor

This is a requirement of Pyspark mapInArrow (https://github.com/apache/spark/pull/53391/files#diff-464b2c763de6d4a4d1bc63e7bd5816e786ad5e501ddcceb5df96408c72351cb3R36).

Yeah I don't quite understand why that's the case. Like I said that might be the lack of understanding of arrow or pyspark. Someone else might be more familiar with the code and see if this is necessary. It seems like we copied/transformed the data a few times.

Signed-off-by: Devin Petersohn <[email protected]>
Signed-off-by: Devin Petersohn <[email protected]>
byte_df = df.mapInArrow(batch_to_bytes_iter, binary_schema)
# A row is actually a batch of data in Arrow IPC format. Fetch the batches one by one.
for row in byte_df.toLocalIterator():
with pyarrow.ipc.open_stream(row.arrow_ipc_bytes) as reader:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the future, I feel we can optimize out this UDF execution mapInArrow:
1, use some helper methods like Dataset.toArrowBatchRdd for classic dataframe;
2, use some internal methods inside spark.client.to_table for connect dataframe.
This can be done in followup PRs.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree, and we should make sure that we leverage the streaming nature of the PyCapsule protocol in the design.

devin-petersohn and others added 2 commits December 11, 2025 10:38
Signed-off-by: Devin Petersohn <[email protected]>
@github-actions github-actions bot added the BUILD label Dec 11, 2025
Signed-off-by: Devin Petersohn <[email protected]>
@github-actions github-actions bot removed the CORE label Dec 11, 2025
Signed-off-by: Devin Petersohn <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants