Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-50856][SS][PYTHON][CONNECT] Spark Connect Support for TransformWithStateInPandas In Python #49560

Closed
wants to merge 25 commits into from

Conversation

jingz-db
Copy link
Contributor

@jingz-db jingz-db commented Jan 18, 2025

What changes were proposed in this pull request?

Support TransformWithStateInPandas in spark connect mode.

Why are the changes needed?

As Spark connect is becoming default in spark 4.0, we need to add connect support for TWS in Python.

Does this PR introduce any user-facing change?

No.

How was this patch tested?

Unit tests in test_pandas_transform_with_state.

Was this patch authored or co-authored using generative AI tooling?

No.

Copy link
Contributor

@bogao007 bogao007 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We might also need to follow this PR #48560 for Spark Classic and Spark Connect compatibility check.

Comment on lines 1038 to 1040
val input = Dataset
.ofRows(session, transformRelation(rel.getInput))
.groupBy(cols: _*)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: Can we share this part in both conditions to avoid some duplicated code?

@@ -26,3 +42,185 @@ class TransformWithStateInPandasFuncMode(Enum):
PROCESS_TIMER = 2
COMPLETE = 3
PRE_INIT = 4


class TransformWithStateInPandasUdfUtils:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I saw we are sharing this UDF definition util in both client and server, would that bring any compatibility issue since they are directly copied from the server? e.g. the import pyspark.sql.pandas._typing is a server side library while in client side we use pyspark.sql.connect._typing. cc @HyukjinKwon who may know more about this.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nope, typing is optional so won't have any side effect during execution.

@@ -1096,6 +1096,7 @@ def __hash__(self):
"pyspark.sql.tests.connect.pandas.test_parity_pandas_udf_scalar",
"pyspark.sql.tests.connect.pandas.test_parity_pandas_udf_grouped_agg",
"pyspark.sql.tests.connect.pandas.test_parity_pandas_udf_window",
"pyspark.sql.tests.connect.pandas.test_parity_pandas_transform_with_state",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Have we verified that this test actually run in CI?

Copy link
Contributor Author

@jingz-db jingz-db Feb 1, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah i think so. I got several failed test case for this suite in previous CI run: https://github.com/jingz-db/spark/actions/runs/13039529632/job/36378113583#step:12:4144 which is now fixed, but this verifies the suite is actually running on CI

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great, thanks for confirming!


self._test_transform_with_state_in_pandas_basic(
SimpleTTLStatefulProcessor(), check_results, False, "processingTime"
)

# TODO SPARK-50908 holistic fix for TTL suite
@unittest.skip("test is flaky and it is only a timing issue, skipping until we can resolve")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are we trying to fix the TTL flaky test here? If so, maybe better to create a separate PR for this and track that TODO Jira?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea if its not related to the connect change - lets file a separate PR for this

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks! Will file a separate PR for fixing the suite.

@jingz-db jingz-db requested a review from bogao007 February 3, 2025 21:01
)
initial_state_plan = None
initial_state_grouping_cols = None

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: do we need newline here ?

@@ -2546,6 +2546,70 @@ def plan(self, session: "SparkConnectClient") -> proto.Relation:
return self._with_relations(plan, session)


class TransformWithStateInPandas(LogicalPlan):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we add some comments here ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you mean class docstring? I have one line comment of docstring in line 2550.

import warnings

from pyspark.errors import PySparkTypeError
from pyspark.util import PythonEvalType
from pyspark.sql.column import Column
from pyspark.sql.dataframe import DataFrame
from pyspark.sql.streaming.state import GroupStateTimeout
from pyspark.sql.streaming.stateful_processor_api_client import (
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We dont need this now ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah we don't need it now. I moved the UDF logic outside this file into a separate file so that spark connect and classic mode can share the UDF function.

Row(id="0", countAsString="2"),
Row(id="1", countAsString="2"),
}
else:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are we removing the else part entirely ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't need to explicitly end the stream from FEB sink now because I add the "spark.sql.streaming.noDataMicroBatches.enabled", "false" config so the stream will end without else part.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cool thanks

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd rather suggest to use Trigger.AvailableNow which will give what you want for start() -> processAllAvailable() -> stop(). Though you may still want to disable noDataBatch if you want to simplify.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Trigger.AvailableNow will also requires us to explicitly stop the query inside the sink if noDataBatch is not enabled. See discussion we had previously on the chaining of operator PR: #48124 (comment)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That is only when we use processing time mode, but yeah I don't have a strong opinion. Good to leave as it is to address the issue consistently.


// (Optional) Schema for the output DataFrame.
// Only required used for TransformWithStateInPandas.
optional string output_schema = 4;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To reviewer: this output_schema is the only optional field specific to Python. The rest fields are unified between Python and Scala for TransformWithState.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How does Scala infer its schema? Can you please make this a DataType and use Unparsed for python.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This schema is only used for Python. Scala is a typed language so in Scala, we will be able to infer the output schema from output row type and this entry is not needed.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just revised this to optional Datatype. Thanks for the comment!

}

// Additional input parameters used for TransformWithState operator.
message TransformWithStateInfo {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah OK, so we use GroupMap to represent MapGroupsWithState/FlatMapGroupsWithState/ApplyInPandas/ApplyInPandasWithState/TransformWithState altogether?

I don't know what is the convention for Spark Connect, so I'll probably need to get help from @hvanhovell to audit the proto change.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think for ApplyInPandasWithState, we have a separate message. When I was implementing FlatMapGroupsWithState, I was suggested to use the existing GroupMap message since they share a lot in common.

For this case though, I think it might makes more sense to have a separate message for TransformWithState. @hvanhovell which way is more recommended here in Spark Connect?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is fine. In all cases this represents more or less the same thing: you apply an arbitrary transform to grouped data.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BTW I think this starts to break down when there is too little overlap between the various implementations. So I supposed the question is how different is TransformWithState from for example FlatMapGroupsWithState?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the GroupMap proto schema here, TransformWithState and FlatmapGroupsWithState shares the following 6 fields:

// (Required) Input relation for Group Map API: apply, applyInPandas.
  Relation input = 1;

  // (Required) Expressions for grouping keys.
  repeated Expression grouping_expressions = 2;

  // (Required) Input user-defined function.
  CommonInlineUserDefinedFunction func = 3;

  // (Optional) Expressions for sorting. Only used by Scala Sorted Group Map API.
  repeated Expression sorting_expressions = 4;

  // Below fields are only used by (Flat)MapGroupsWithState
  // (Optional) Input relation for initial State.
  Relation initial_input = 5;

  // (Optional) Expressions for grouping keys of the initial state input relation.
  repeated Expression initial_grouping_expressions = 6;

And TransformWithState has three additional fields defined in the TransformWithStateInfo that is not used by FMGWS. FMGWS has the following 3 fields that is also not used by TransformWithState:

// (Optional) True if MapGroupsWithState, false if FlatMapGroupsWithState.
  optional bool is_map_groups_with_state = 7;

// (Optional) Timeout configuration for groups that do not receive data for a while.
  optional string timeout_conf = 9;

  // (Optional) The schema for the grouped state.
  optional DataType state_schema = 10;

So it is actually still slightly more benefits/sharing fields to keep TransformWithState inside the GroupMap.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

GroupMap may also be used for a normal "flatMapGroups" without state - so ideally we should have just one optional field in "GroupMap" such as:

oneof stateInfo {
    MapGroupsWithStateInfo = xx;
    TransformWithStateInfo = yy;
}

And the stateInfo would clearly indicate three different types:

  1. when it is none, it is a normal "flatMapGroups"
  2. when it is MapGroupsWithStateInfo, it's for "MapGroupsWithState" or "FlatMapGroupsWithState"
  3. otherwise, it's for transformWithState.

This would look better, however, it is not backward compatible.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This would look better, however, it is not backward compatible.

Yeah agreed... I guess it is not very safe to change the connect protocol for a released operator. We should probably leave FMGWS as it is in this PR.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@haiyangsun-db you could make it compatible by retaining the original fields. The only problem is that now you have two codepaths in the planner to maintain.

@jingz-db we only have to be backwards compatible (older client should work on a newer service). As long as we make additive changes we should be good.

Copy link
Contributor Author

@jingz-db jingz-db Feb 25, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the discussion above! Maintaining two code paths seems too expensive so we should probably keep the way it is for FMGWS and have a separate schema for TWS. This way if we want to add a new operator in the future, then it can follow what Haiyang suggested above easily by making an additive change.

Copy link
Contributor

@HeartSaVioR HeartSaVioR left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

First pass. Looks good in overall.

@@ -1180,6 +1179,7 @@ def load_stream(self, stream):
this function works in overall.
"""
import pyarrow as pa
from pyspark.sql.streaming.stateful_processor_util import TransformWithStateInPandasFuncMode
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just curious, any specific reason to descope the import?

Copy link
Contributor Author

@jingz-db jingz-db Feb 12, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We will hit circular dependency so it has to be descoped:

File "/spark/python/pyspark/sql/pandas/serializers.py", line 39, in <module>
    from pyspark.sql.streaming.stateful_processor_util import TransformWithStateInPandasFuncMode
  File "/spark/python/pyspark/sql/streaming/stateful_processor_util.py", line 21, in <module>
    from pyspark.sql.streaming.stateful_processor_api_client import (
ImportError: cannot import name 'StatefulProcessorApiClient' from partially initialized module 'pyspark.sql.streaming.stateful_processor_api_client' (most likely due to a circular import) (/spark/python/pyspark/sql/streaming/stateful_processor_api_client.py)

@@ -376,7 +363,7 @@ def transformWithStateInPandas(
timeMode: str,
initialState: Optional["GroupedData"] = None,
eventTimeColumnName: str = "",
) -> DataFrame:
) -> "DataFrame":
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also curious, what's the effect of "" on type name?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was failing on this CI for doc compatibility check: #49560 (review). This extra "" will help pass on the check.

self._stateful_processor = stateful_processor
self._time_mode = time_mode

def transformWithStateUDF(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I assume you just copied and pasted, but please highlight if there was also a code change other than copying. It's uneasy to know if there is also a refactor.

Copy link
Contributor Author

@jingz-db jingz-db Feb 12, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, let me add a few comments to reviewers in the lines!


@classmethod
def conf(cls):
cfg = SparkConf(loadDefaults=False)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this entire thing just to remove the config "spark.master"? Just wanted to understand what we are doing here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah... conf(cls) was overwritten in both ReusedConnectTestCase and TransformWithStateInPandasTestsMixin. I hit an exception complaining spark.master cannot be configured at the same time with "SPARK_CONNECT_TESTING_REMOTE", "local[4]". This is due to ReusedConnectTestCase is doing this:

@classmethod
def conf(cls):
      conf = SparkConf(loadDefaults=False)
              # Make the server terminate reattachable streams every 1 second and 123 bytes,
              # to make the tests exercise reattach.
              if conf._jconf is not None:
                  conf._jconf.remove("spark.master")
              # other configs are added here

While TransformWithStateInPandasTestsMixin is doing this:

@classmethod
def conf(cls):
    cfg = SparkConf()
    # other configs are added here

I suspect I am hitting the issue because it is indeterministic which function will be executed first. After I explicitly constructing conf here by adding all configs in the base classes and remove "spark.master", the exception goes away.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah OK, that is due to multiple inheritance from the same level. Probably better to put note for this - thanks for explanation.

@@ -67,6 +67,8 @@ def conf(cls):
)
cfg.set("spark.sql.execution.arrow.transformWithStateInPandas.maxRecordsPerBatch", "2")
cfg.set("spark.sql.session.timeZone", "UTC")
# TODO SPARK-50180 this config is to stop query from FEB sink gracefully
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This ticket refers to the JIRA ticket which is marked as "Duplicated". Shall we update this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmmm I did not know about this ticket being closed until now. Let me check with Anish. I don't think we have this issue fixed yet.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

https://issues.apache.org/jira/browse/SPARK-49046 this is the duplicate one for SPARK-50180. I added a comment on SPARK-50180 to point to SPARK-49046

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's just change to SPARK-49046 in this comment as well.

Row(id="0", countAsString="2"),
Row(id="1", countAsString="2"),
}
else:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd rather suggest to use Trigger.AvailableNow which will give what you want for start() -> processAllAvailable() -> stop(). Though you may still want to disable noDataBatch if you want to simplify.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The most of changes are refactor, do I understand correctly? I just wanted to avoid side-by-side comparison for line-by-line.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Again, please highlight if there was also a code change other than copying. It's uneasy to know if there is also a refactor.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks! Most the the changes are refactor. Let me add few lines of comments below highlighting the non-trivial change.

@HeartSaVioR
Copy link
Contributor

@hvanhovell Please take a look at this PR if you have time. I think this is simpler than Scala PR and many things are following the existing practice with applyInPandasWithState. Thanks in advance!

self._stateful_processor = stateful_processor
self._time_mode = time_mode

def transformWithStateUDF(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To reviewers: transformWithStateUDF and transformWithStateWithInitStateUDF was moved from group_ops.py. No code difference except for minor naming change for internal parameters - e.g. stateful_processor_api_client was previously named as statefulProcessorApiClient. We should use snake case for internal function input parameters so I made the naming format change here.


return result

def _handle_pre_init(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To reviewers: _handle_pre_init, _handle_data_rows and handle_expired_timers was moved from group_ops.py. No code difference except for minor naming change for internal parameters naming change (statefulProcessorApiClient -> stateful_processor_api_client). And these functions are private because only transformWithStateWithInitStateUDF and transformWithStateUDF will be used in group_ops.py and group.py and the rest should not be exposed to external classes.

assert set(batch_df.sort("id").select("id", "countAsString").collect()) == {
Row(id="0", countAsString="3"),
Row(id="0", countAsString="-1"),
Row(id="1", countAsString="5"),
}
# The expired timestamp in current batch is larger than expiry timestamp in batch 1
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This check was removed after connect suites are added. It is good to have additional check but not necessary. We need to access self.first_expired_timestamp which is not doable for spark suite. This check is mostly checking the expired timer timestamp in batch 1 is smaller than timestamp of expired timer in batch 2.

StructField("value", StringType(), True),
]
)
output_schema = "id string, value string"
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To reviewers: this is a non-trivial code change. I found we miss a test coverage of checking input string schema for output_schema so I added it here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks Jing! We didn't change anything around the output schema for TWS, but still worth adding the coverage!

@jingz-db jingz-db requested a review from HeartSaVioR February 12, 2025 22:10
Copy link
Contributor

@bogao007 bogao007 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM overall. Just need to check with Herman on the proto message.

}

// Additional input parameters used for TransformWithState operator.
message TransformWithStateInfo {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think for ApplyInPandasWithState, we have a separate message. When I was implementing FlatMapGroupsWithState, I was suggested to use the existing GroupMap message since they share a lot in common.

For this case though, I think it might makes more sense to have a separate message for TransformWithState. @hvanhovell which way is more recommended here in Spark Connect?

StructField("value", StringType(), True),
]
)
output_schema = "id string, value string"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks Jing! We didn't change anything around the output schema for TWS, but still worth adding the coverage!

.builder(groupedDs.df.logicalPlan.output)
.asInstanceOf[PythonUDF]

groupedDs
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: Since besides initialStateDs, it's the same for both cases. Can we just use the if-else clause to set the value of initialStateDs and move this part out of the clause?

val initialStateDs = null

if (rel.hasInitialInput) {
    initialStateDs = ...
}

groupedDs
        .transformWithStateInPandas(
          Column(pythonUdf),
          outputSchema,
          twsInfo.getOutputMode,
          twsInfo.getTimeMode,
          initialStateDs,
          twsInfo.getEventTimeColumnName)
        .logicalPlan

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not only initialStateDs but UDF as well. It does not seem to help much.

Copy link
Contributor

@HeartSaVioR HeartSaVioR left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 but I would like to get approval from @hvanhovell for proto change.

// Additional input parameters used for TransformWithState operator.
message TransformWithStateInfo {
// (Required) Time mode string for transformWithState.
string time_mode = 1;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be an enum?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Answered in the below comment with output mode: https://github.com/apache/spark/pull/49560/files#r1962978619

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can't find that comment.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The above link is pointing to this thread:
image

string time_mode = 1;

// (Required) Output mode string for transformWithState.
string output_mode = 2;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be an enum?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is different between this output mode and that of the GroupMap message?

Copy link
Contributor Author

@jingz-db jingz-db Feb 20, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is different between this output mode and that of the GroupMap message?

TransformWithState has its own dedicated Scala class of TimeMode which is different from the time mode class used by GroupMap timemode so we will need a separate field wrapped under TransformWithStateInfo. But the output mode is the same class. I will remove the output mode entry here and let TransformWithState reuse output_mode in GroupMap.

Should this be an enum?

For Python, time mode and output mode are user input strings and they will only be restored on JVM into Scala class here. So I think it is probably better to reuse the existing methods for restoring user input string into Scala class and keep the string format here.

Column(transformExpression(expr)))

val initialStateDs = Dataset
.ofRows(session, transformRelation(rel.getInitialInput))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In general please avoid doing this in the SparkConnectPlanner. This is very expensive to do, and we don't really track this well (as in the metrics might be off).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there any recommended way for restoring RelationalGroupedDataset in the connect world? I saw Dataset.ofRows() is used in multiple places in SparkConnectPlanner.scala. e.g. coGroupMap is doing the above as well to restoring a RelationalGroupedDataset as well: https://github.com/apache/spark/blob/master/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala#L791

Essentially we are doing the same thing for initialStateDs here - it is a user input parameter of type RelationalGroupedDataset.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am aware it is used in multiple places in the planner. That does not mean it should be encouraged. We want to get rid of most of this after Spark 4.0.

Copy link
Contributor Author

@jingz-db jingz-db Feb 25, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the comments! Feel free to ping me again for making changes when Connect has recommended approach to handle this.

@jingz-db jingz-db requested a review from hvanhovell February 20, 2025 09:06
Copy link
Contributor

@hvanhovell hvanhovell left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@jingz-db
Copy link
Contributor Author

jingz-db commented Feb 25, 2025

@HeartSaVioR
Copy link
Contributor

HeartSaVioR commented Feb 26, 2025

Thanks for your patience! I see both I and @hvanhovell approved which is great for covering both areas.

Thanks! Merging to master. (Let me couple the merge for Spark Connect with Scala one. If we could make it for Scala in 4.0, I'll port this back to 4.0 as well.)

Pajaraja pushed a commit to Pajaraja/spark that referenced this pull request Mar 6, 2025
…mWithStateInPandas In Python

### What changes were proposed in this pull request?

Support TransformWithStateInPandas in spark connect mode.

### Why are the changes needed?

As Spark connect is becoming default in spark 4.0, we need to add connect support for TWS in Python.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Unit tests in test_pandas_transform_with_state.

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes apache#49560 from jingz-db/python-tws-connect.

Lead-authored-by: jingz-db <[email protected]>
Co-authored-by: Jing Zhan <[email protected]>
Signed-off-by: Jungtaek Lim <[email protected]>
kazemaksOG pushed a commit to kazemaksOG/spark-custom-scheduler that referenced this pull request Mar 27, 2025
…mWithStateInPandas In Python

### What changes were proposed in this pull request?

Support TransformWithStateInPandas in spark connect mode.

### Why are the changes needed?

As Spark connect is becoming default in spark 4.0, we need to add connect support for TWS in Python.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Unit tests in test_pandas_transform_with_state.

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes apache#49560 from jingz-db/python-tws-connect.

Lead-authored-by: jingz-db <[email protected]>
Co-authored-by: Jing Zhan <[email protected]>
Signed-off-by: Jungtaek Lim <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

7 participants