Skip to content

Commit 19a1da9

Browse files
Yicong-HuangHyukjinKwon
authored andcommitted
[SPARK-54639][PYTHON] Avoid unnecessary Table creation in Arrow serializers
### What changes were proposed in this pull request? Optimize Arrow serializers (`ArrowStreamPandasSerializer` and `GroupPandasUDFSerializer`) by avoiding unnecessary `pa.Table` creation when processing single `RecordBatch` instances. The optimization replaces `pa.Table.from_batches([batch]).itercolumns()` with direct column access using `batch.column(i)` for single batches. This eliminates unnecessary Table and iterator object creation, reducing function call overhead and GC pressure. **Changes:** - `ArrowStreamPandasSerializer.load_stream()`: Direct column access instead of creating Table wrapper - `GroupPandasUDFSerializer.load_stream()`: Direct column access for each batch **Code example:** ```python # Before (ArrowStreamPandasSerializer.load_stream) for batch in batches: pandas_batches = [ self.arrow_to_pandas(c, i) for i, c in enumerate(pa.Table.from_batches([batch]).itercolumns()) ] # After for batch in batches: pandas_batches = [ self.arrow_to_pandas(batch.column(i), i) for i in range(batch.num_columns) ] ``` ### Why are the changes needed? Several serializers in `pyspark.sql.pandas.serializers` unnecessarily create `pa.Table` objects when processing single `RecordBatch` instances. When converting Arrow RecordBatches to pandas Series, the code creates a `pa.Table` wrapper for each batch just to iterate over columns, which introduces: - Unnecessary object creation (Table objects and iterators) - Extra function call overhead - Increased GC pressure For a workload processing 1000 batches with 10 columns each, this avoids creating 2000 temporary objects (1000 Table objects + 1000 iterators). `RecordBatch.column(i)` directly returns the column array reference (zero-copy), reducing function call overhead. ### Does this PR introduce _any_ user-facing change? No. This is a performance optimization that maintains backward compatibility. The serialization behavior remains the same, only the internal implementation is optimized. ### How was this patch tested? Existing tests pass without modification. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #53387 from Yicong-Huang/SPARK-54639/feat/optimize-arrow-serializers. Authored-by: Yicong-Huang <[email protected]> Signed-off-by: Hyukjin Kwon <[email protected]>
1 parent 19ec991 commit 19a1da9

File tree

1 file changed

+2
-5
lines changed

1 file changed

+2
-5
lines changed

python/pyspark/sql/pandas/serializers.py

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -500,13 +500,11 @@ def load_stream(self, stream):
500500
Deserialize ArrowRecordBatches to an Arrow table and return as a list of pandas.Series.
501501
"""
502502
batches = super().load_stream(stream)
503-
import pyarrow as pa
504503
import pandas as pd
505504

506505
for batch in batches:
507506
pandas_batches = [
508-
self.arrow_to_pandas(c, i)
509-
for i, c in enumerate(pa.Table.from_batches([batch]).itercolumns())
507+
self.arrow_to_pandas(batch.column(i), i) for i in range(batch.num_columns)
510508
]
511509
if len(pandas_batches) == 0:
512510
yield [pd.Series([pyspark._NoValue] * batch.num_rows)]
@@ -1225,8 +1223,7 @@ def process_group(batches: "Iterator[pa.RecordBatch]"):
12251223
for batch in batches:
12261224
# The batch from ArrowStreamSerializer is already flattened (no struct wrapper)
12271225
series = [
1228-
self.arrow_to_pandas(c, i)
1229-
for i, c in enumerate(pa.Table.from_batches([batch]).itercolumns())
1226+
self.arrow_to_pandas(batch.column(i), i) for i in range(batch.num_columns)
12301227
]
12311228
yield series
12321229

0 commit comments

Comments
 (0)