Skip to content

Commit

Permalink
[SPARK-46082][PYTHON][CONNECT] Fix protobuf string representation for…
Browse files Browse the repository at this point in the history
… Pandas Functions API with Spark Connect

### What changes were proposed in this pull request?

This PR proposes to rename `_func` to `_functions` in the protobuf instances for Pandas Functions API with Spark Connect so the string presentation includes them (see also apache#39223).

### Why are the changes needed?

In order to have the pretty string format for protobuf messages in Python side.

### Does this PR introduce _any_ user-facing change?

Yes,

```bash
./bin/pyspark --remote local
```

```python
df = spark.range(1)
print(df.mapInPandas(lambda x: x, df.schema)._plan.print())
```

**Before:**
```
<MapPartitions is_barrier='False'>
  <Range start='0', end='1', step='1', num_partitions='None'>
```

**After:**

```
<MapPartitions function='<lambda>(id)', is_barrier='False'>
  <Range start='0', end='1', step='1', num_partitions='None'>
```

### How was this patch tested?

Manually tested as above.

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes apache#43991 from HyukjinKwon/fix-print-proto.

Authored-by: Hyukjin Kwon <[email protected]>
Signed-off-by: Hyukjin Kwon <[email protected]>
  • Loading branch information
HyukjinKwon committed Nov 24, 2023
1 parent 6c7ebe8 commit cd1ec48
Show file tree
Hide file tree
Showing 2 changed files with 8 additions and 11 deletions.
2 changes: 0 additions & 2 deletions python/pyspark/sql/connect/group.py
Original file line number Diff line number Diff line change
Expand Up @@ -378,15 +378,13 @@ def applyInPandas(
evalType=PythonEvalType.SQL_COGROUPED_MAP_PANDAS_UDF,
)

all_cols = self._extract_cols(self._gd1) + self._extract_cols(self._gd2)
return DataFrame.withPlan(
plan.CoGroupMap(
input=self._gd1._df._plan,
input_grouping_cols=self._gd1._grouping_cols,
other=self._gd2._df._plan,
other_grouping_cols=self._gd2._grouping_cols,
function=udf_obj,
cols=all_cols,
),
session=self._gd1._df._session,
)
Expand Down
17 changes: 8 additions & 9 deletions python/pyspark/sql/connect/plan.py
Original file line number Diff line number Diff line change
Expand Up @@ -2208,14 +2208,14 @@ def __init__(
) -> None:
super().__init__(child)

self._func = function._build_common_inline_user_defined_function(*cols)
self._function = function._build_common_inline_user_defined_function(*cols)
self._is_barrier = is_barrier

def plan(self, session: "SparkConnectClient") -> proto.Relation:
assert self._child is not None
plan = self._create_proto_relation()
plan.map_partitions.input.CopyFrom(self._child.plan(session))
plan.map_partitions.func.CopyFrom(self._func.to_plan_udf(session))
plan.map_partitions.func.CopyFrom(self._function.to_plan_udf(session))
plan.map_partitions.is_barrier = self._is_barrier
return plan

Expand All @@ -2234,7 +2234,7 @@ def __init__(

super().__init__(child)
self._grouping_cols = grouping_cols
self._func = function._build_common_inline_user_defined_function(*cols)
self._function = function._build_common_inline_user_defined_function(*cols)

def plan(self, session: "SparkConnectClient") -> proto.Relation:
assert self._child is not None
Expand All @@ -2243,7 +2243,7 @@ def plan(self, session: "SparkConnectClient") -> proto.Relation:
plan.group_map.grouping_expressions.extend(
[c.to_plan(session) for c in self._grouping_cols]
)
plan.group_map.func.CopyFrom(self._func.to_plan_udf(session))
plan.group_map.func.CopyFrom(self._function.to_plan_udf(session))
return plan


Expand All @@ -2257,7 +2257,6 @@ def __init__(
other: Optional["LogicalPlan"],
other_grouping_cols: Sequence[Column],
function: "UserDefinedFunction",
cols: List[Column],
):
assert isinstance(input_grouping_cols, list) and all(
isinstance(c, Column) for c in input_grouping_cols
Expand All @@ -2272,7 +2271,7 @@ def __init__(
self._other = cast(LogicalPlan, other)
# The function takes entire DataFrame as inputs, no need to do
# column binding (no input columns).
self._func = function._build_common_inline_user_defined_function()
self._function = function._build_common_inline_user_defined_function()

def plan(self, session: "SparkConnectClient") -> proto.Relation:
assert self._child is not None
Expand All @@ -2285,7 +2284,7 @@ def plan(self, session: "SparkConnectClient") -> proto.Relation:
plan.co_group_map.other_grouping_expressions.extend(
[c.to_plan(session) for c in self._other_grouping_cols]
)
plan.co_group_map.func.CopyFrom(self._func.to_plan_udf(session))
plan.co_group_map.func.CopyFrom(self._function.to_plan_udf(session))
return plan


Expand All @@ -2307,7 +2306,7 @@ def __init__(

super().__init__(child)
self._grouping_cols = grouping_cols
self._func = function._build_common_inline_user_defined_function(*cols)
self._function = function._build_common_inline_user_defined_function(*cols)
self._output_schema = output_schema
self._state_schema = state_schema
self._output_mode = output_mode
Expand All @@ -2320,7 +2319,7 @@ def plan(self, session: "SparkConnectClient") -> proto.Relation:
plan.apply_in_pandas_with_state.grouping_expressions.extend(
[c.to_plan(session) for c in self._grouping_cols]
)
plan.apply_in_pandas_with_state.func.CopyFrom(self._func.to_plan_udf(session))
plan.apply_in_pandas_with_state.func.CopyFrom(self._function.to_plan_udf(session))
plan.apply_in_pandas_with_state.output_schema = self._output_schema
plan.apply_in_pandas_with_state.state_schema = self._state_schema
plan.apply_in_pandas_with_state.output_mode = self._output_mode
Expand Down

0 comments on commit cd1ec48

Please sign in to comment.