diff --git a/python/pyspark/sql/pandas/serializers.py b/python/pyspark/sql/pandas/serializers.py index 43a42d7fc3b4..f7597c3bdafa 100644 --- a/python/pyspark/sql/pandas/serializers.py +++ b/python/pyspark/sql/pandas/serializers.py @@ -500,13 +500,11 @@ def load_stream(self, stream): Deserialize ArrowRecordBatches to an Arrow table and return as a list of pandas.Series. """ batches = super().load_stream(stream) - import pyarrow as pa import pandas as pd for batch in batches: pandas_batches = [ - self.arrow_to_pandas(c, i) - for i, c in enumerate(pa.Table.from_batches([batch]).itercolumns()) + self.arrow_to_pandas(batch.column(i), i) for i in range(batch.num_columns) ] if len(pandas_batches) == 0: yield [pd.Series([pyspark._NoValue] * batch.num_rows)] @@ -1225,8 +1223,7 @@ def process_group(batches: "Iterator[pa.RecordBatch]"): for batch in batches: # The batch from ArrowStreamSerializer is already flattened (no struct wrapper) series = [ - self.arrow_to_pandas(c, i) - for i, c in enumerate(pa.Table.from_batches([batch]).itercolumns()) + self.arrow_to_pandas(batch.column(i), i) for i in range(batch.num_columns) ] yield series