Skip to content

Conversation

zeruibao
Copy link
Contributor

@zeruibao zeruibao commented Sep 18, 2025

What changes were proposed in this pull request?

Limit the byte size of Arrow batch for TWS to avoid OOM.

Why are the changes needed?

On the Python worker side, when using the Pandas execution path, Arrow batches must be converted into Pandas DataFrames in memory. If an Arrow batch is too large, this conversion can lead to OOM errors in the Python worker. To mitigate this risk, we need to enforce a limit on the byte size of each Arrow batch. Similarly, processing the Pandas DataFrame inside handleInputRows also occurs entirely in memory, so applying a size limit to the DataFrame itself further helps prevent OOM issues.

Does this PR introduce any user-facing change?

No

How was this patch tested?

UT

Was this patch authored or co-authored using generative AI tooling?

No

Copy link
Contributor

@HeartSaVioR HeartSaVioR left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@zeruibao
The code change looks good to me. Thanks! Let's hear how PySpark folks think about additional limiting size of Pandas DataFrame creation on top of limiting size of Arrow RecordBatch.

cc. @HyukjinKwon @zhengruifeng PTAL. Thanks in advance!

for batch_key, group_rows in groupby(row_stream(), key=lambda x: x[0]):
df = pd.DataFrame([row for _, row in group_rows])
yield (batch_key, df)
rows = []
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@HyukjinKwon @zhengruifeng
What do you think about the code? We limit the size of Arrow RecordBatch in task thread when sending to Python worker, and @zeruibao added this to re-align the size for Pandas DataFrame. Did we do this in other UDF? Is it beneficial or probably over-thinking?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did we do this in other UDF?

I don't think so

Is it beneficial or probably over-thinking?

I remember @HyukjinKwon discussed it before, it should be beneficial if the size is properly estimated

@HeartSaVioR HeartSaVioR changed the title [SPARK-53638] Limit the byte size of arrow batch for TWS to avoid OOM [SPARK-53638][SS][PYTHON] Limit the byte size of arrow batch for TWS to avoid OOM Sep 24, 2025
# Short circuit batch size calculation if the batch size is
# unlimited as computing batch size is computationally expensive.
if self.arrow_max_bytes_per_batch != 2**31 - 1:
accumulate_size += sum(sys.getsizeof(x) for x in row)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TBH, I am not sure about the way to compute the batch size in the python side.

cc @HyukjinKwon @viirya

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, is sys.getsizeof accurate for nested type columns?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Btw, the rows are not arrow batches anymore here, right? Using the arrow_max_bytes_per_batch config might be misleading.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I agree sys.getsizeof is not accurate for nested type columns. Let me try other way out.

Copy link
Contributor Author

@zeruibao zeruibao Oct 2, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I got a better idea. It would be to compute the average row size for each Arrow RecordBatch. When accumulating rows into a Pandas DataFrame, we can then use this average row size as an estimate for the DataFrame row, avoiding the need to measure each individual row. Then we can still use this arrow_max_bytes_per_batch config and it's more accurate than sys.getsizeof cc: @HeartSaVioR

@zeruibao
Copy link
Contributor Author

zeruibao commented Oct 1, 2025

Hi @HyukjinKwon, I have done the benchmark showing that there is no regression introduced by this PR. Is this PR good to merge now?

Copy link
Member

@HyukjinKwon HyukjinKwon left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks fine but I would like @HeartSaVioR and @zhengruifeng to sign off as well.

arrow_max_bytes_per_batch,
int_to_decimal_coercion_enabled=int_to_decimal_coercion_enabled,
)
elif eval_type == PythonEvalType.SQL_TRANSFORM_WITH_STATE_PANDAS_INIT_STATE_UDF:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doesn't SQL_TRANSFORM_WITH_STATE_PANDAS_INIT_STATE_UDF need it too?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SQL_TRANSFORM_WITH_STATE_PANDAS_INIT_STATE_UDF follows a separate execution path. Each Arrow batch contains rows with the same key, which allows us to directly convert the batch into a Pandas DataFrame and yield it. Since the Arrow batch size is already subject to a byte-size limit, the resulting Pandas DataFrame also inherently respects this constraint.

rows.append(row)
# Short circuit batch size calculation if the batch size is
# unlimited as computing batch size is computationally expensive.
if self.arrow_max_bytes_per_batch != 2**31 - 1:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I saw that you applied the batch size limit at the Scala side, why needing to duplicate the limit here?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, the serializer groups the rows by key here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes!

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, if we were able to break a large data for a grouping key into multiple Arrow RecordBatches, each Arrow RecordBatch can be converted into Pandas DataFrame and they can be connected with iterator/generator which won't require to materialize multiple Arrow RecordBatches at once. That's how we do with applyInPandasWithState.

I can think of a couple cases which this logic would help:

  1. Very few data are bounded in a single grouping key but split to two Arrow RecordBatches

This case we will be able to provide one Pandas DataFrame instead of two.

  1. Converting Arrow RecordBatch to Pandas DataFrame could trigger whole different memory usage

I don't have an evidence and it's just a random thought of justification of the change - the size of data in the Arrow RecordBatch for a grouping key could be different from the result of conversion on Pandas DataFrame.

If, the size of Arrow RecordBatch would be very similar with conversion to Pandas DataFrame, maybe taking the route of applyInPandasWithState would be still OK. Though I'm fine with the argument if this logic is simpler than applyInPandasWithState, assuming sys.getsizeof is accurate.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we already leverage the generator-based approach to optimize for high-cardinality scenarios, switching to applyInPandasWithState would not be a trivial change. I would prefer to retaining the current implementation.

@zeruibao zeruibao requested review from HeartSaVioR and viirya October 2, 2025 01:23
@zeruibao
Copy link
Contributor Author

zeruibao commented Oct 2, 2025

Hey @viirya, could you sign off this PR if it looks good to you?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants