Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-42859][CONNECT][PS] Basic support for pandas API on Spark Connect #40525

Closed
wants to merge 26 commits into from

Conversation

itholic
Copy link
Contributor

@itholic itholic commented Mar 22, 2023

What changes were proposed in this pull request?

This PR proposes to support pandas API on Spark for Spark Connect. This PR includes minimal changes to support basic functionality of the pandas API in Spark Connect, and sets up a testing environment into pyspark/pandas/tests/connect using all existing pandas API on Spark test bases to test the functionality of the pandas API on Spark in a remote Spark session.

Here is a summary of the key tasks:

  1. All pandas-on-Spark tests under the python/pyspark/pandas/tests/ directory can now be performed in Spark Connect by adding corresponding tests to the python/pyspark/pandas/tests/connect/ directory.
  2. Unlike with Spark SQL, we did not create a separate package directory such as python/pyspark/sql/connect for Spark Connect, so I modified the existing files of pyspark.pandas. This allows users to use the existing pandas-on-Spark code as it is on Spark Connect.
  3. Because of 2, I added two typing rules into python/pyspark/pandas/_typing.py for addressing both PySpark Column and Spark Connect Column in the single path.
    • Added GenericColumn for typing both PySpark Column and Spark Connect Column.
    • Added GenericDataFrame for typing both PySpark DataFrame and Spark Connect DataFrame.

Why are the changes needed?

By supporting the pandas API in Spark Connect, it can significantly improve the usability for existing PySpark and pandas users.

Does this PR introduce any user-facing change?

No, because it is designed to allow existing code for regular Spark sessions to be used without any user-facing changes other than switching the regular Spark session to remote Spark session. However, since some features of the existing pandas API on Spark are not fully supported yet, some features may be limited.

How was this patch tested?

A testing bed has been set up to reproduce all existing pandas-on-Spark tests for Spark Connect, ensuring that the existing tests can be replicated in Spark Connect. The current result for all tests as below:

Test file Test total Test passed Coverage
test_parity_dataframe.py 105 85 80.95%
test_parity_dataframe_slow.py 66 48 72.73%
test_parity_dataframe_conversion.py 11 11 100.00%
test_parity_dataframe_spark_io.py 8 7 87.50%
test_parity_ops_on_diff_frames.py 75 75 100.00%
test_parity_series.py 131 104 79.39%
test_parity_series_datetime.py 41 34 82.93%
test_parity_categorical.py 29 22 75.86%
test_parity_config.py 7 7 100.00%
test_parity_csv.py 18 18 100.00%
test_parity_default_index.py 4 1 25.00%
test_parity_ewm.py 3 1 33.33%
test_parity_expanding.py 22 2 9.09%
test_parity_extention.py 7 7 100.00%
test_parity_frame_spark.py 6 2 33.33%
test_parity_generic_functions.py 4 1 25.00%
test_parity_groupby.py 49 36 73.47%
test_parity_groupby_slow.py 205 147 71.71%
test_parity_indexing.py 3 3 100.00%
test_parity_indexops_spark.py 3 3 100.00%
test_parity_internal.py 1 0 0.00%
test_parity_namespace.py 29 26 89.66%
test_parity_numpy_compat.py 6 4 66.67%
test_parity_ops_on_diff_frames_groupby.py 22 13 59.09%
test_parity_ops_on_diff_frames_groupby_expanding.py 7 0 0.00%
test_parity_ops_on_diff_frames_groupby_rolling.py 7 0 0.00%
test_parity_ops_on_diff_frames_slow.py 22 15 68.18%
test_parity_repr.py 5 5 100.00%
test_parity_resample.py 5 3 60.00%
test_parity_reshape.py 10 8 80.00%
test_parity_rolling.py 21 1 4.76%
test_parity_scalars.py 1 1 100.00%
test_parity_series_conversion.py 2 2 100.00%
test_parity_series_string.py 56 55 98.21%
test_parity_spark_functions.py 1 1 100.00%
test_parity_sql.py 7 4 57.14%
test_parity_stats.py 15 7 46.67%
test_parity_typedef.py 10 10 100.00%
test_parity_utils.py 5 5 100.00%
test_parity_window.py 2 2 100.00%
test_parity_frame_plot.py 7 5 71.43%
plot/test_parity_frame_plot_matplotlib.py 13 11 84.62%
plot/test_parity_frame_plot_plotly.py 12 9 75.00%
plot/test_parity_series_plot.py 3 3 100.00%
plot/test_parity_series_plot_matplotlib.py 14 8 57.14%
plot/test_parity_series_plot_plotly.py 9 7 77.78%
indexes/test_parity_base.py 144 75 52.08%
indexes/test_parity_category.py 16 7 43.75%
indexes/test_parity_datetime.py 13 11 84.62%
indexes/test_parity_timedelta.py 2 1 50.00%
data_type_ops/test_parity_base.py 2 2 100.00%
data_type_ops/test_parity_binary_ops.py 30 25 83.33%
data_type_ops/test_parity_boolean_ops.py 31 26 83.87%
data_type_ops/test_parity_categorical_ops.py 30 23 76.67%
data_type_ops/test_parity_complex_ops.py 30 30 100.00%
data_type_ops/test_parity_date_ops.py 30 25 83.33%
data_type_ops/test_parity_datetime_ops.py 30 25 83.33%
data_type_ops/test_parity_null_ops.py 26 19 73.08%
data_type_ops/test_parity_num_ops.py 33 25 75.76%
data_type_ops/test_parity_string_ops.py 30 23 76.67%
data_type_ops/test_parity_timedelta_ops.py 26 19 73.08%
data_type_ops/test_parity_udf_ops.py 26 18 69.23%
Total 1588 1173 73.87%

@itholic
Copy link
Contributor Author

itholic commented Mar 22, 2023

The remaining task at hand is to address numerous mypy annotation issues. If you have any good ideas for resolving linter, please feel free to let me know at any time :-)

@itholic itholic changed the title [WIP][SPARK-42859][CONNECT][PS] Basic support for pandas API on Spark Connect [SPARK-42859][CONNECT][PS] Basic support for pandas API on Spark Connect Mar 30, 2023
@itholic itholic marked this pull request as ready for review March 30, 2023 06:34
@itholic
Copy link
Contributor Author

itholic commented Mar 30, 2023

CI passed. cc @HyukjinKwon @ueshin @xinrong-meng @zhengruifeng PTAL when you find some time.

I summarized key changes into PR description for review.

dev/sparktestsupport/modules.py Show resolved Hide resolved
python/pyspark/pandas/utils.py Outdated Show resolved Hide resolved
python/pyspark/pandas/utils.py Show resolved Hide resolved
python/pyspark/pandas/utils.py Show resolved Hide resolved
@HyukjinKwon
Copy link
Member

Merged to master.

@bjornjorgensen
Copy link
Contributor

@itholic Thank you, great work :)

After this PR
from pyspark import pandas as ps

ModuleNotFoundError Traceback (most recent call last)
File /opt/spark/python/pyspark/sql/connect/utils.py:45, in require_minimum_grpc_version()
44 try:
---> 45 import grpc
46 except ImportError as error:

ModuleNotFoundError: No module named 'grpc'

The above exception was the direct cause of the following exception:

ImportError Traceback (most recent call last)
Cell In[1], line 11
9 import pyarrow
10 from pyspark import SparkConf, SparkContext
---> 11 from pyspark import pandas as ps
12 from pyspark.sql import SparkSession
13 from pyspark.sql.functions import col, concat, concat_ws, expr, lit, trim

File /opt/spark/python/pyspark/pandas/init.py:59
50 warnings.warn(
51 "'PYARROW_IGNORE_TIMEZONE' environment variable was not set. It is required to "
52 "set this environment variable to '1' in both driver and executor sides if you use "
(...)
55 "already launched."
56 )
57 os.environ["PYARROW_IGNORE_TIMEZONE"] = "1"
---> 59 from pyspark.pandas.frame import DataFrame
60 from pyspark.pandas.indexes.base import Index
61 from pyspark.pandas.indexes.category import CategoricalIndex

File /opt/spark/python/pyspark/pandas/frame.py:88
85 from pyspark.sql.window import Window
87 from pyspark import pandas as ps # For running doctests and reference resolution in PyCharm.
---> 88 from pyspark.pandas._typing import (
89 Axis,
90 DataFrameOrSeries,
91 Dtype,
92 Label,
93 Name,
94 Scalar,
95 T,
96 GenericColumn,
97 )
98 from pyspark.pandas.accessors import PandasOnSparkFrameMethods
99 from pyspark.pandas.config import option_context, get_option

File /opt/spark/python/pyspark/pandas/_typing.py:25
22 from pandas.api.extensions import ExtensionDtype
24 from pyspark.sql.column import Column as PySparkColumn
---> 25 from pyspark.sql.connect.column import Column as ConnectColumn
26 from pyspark.sql.dataframe import DataFrame as PySparkDataFrame
27 from pyspark.sql.connect.dataframe import DataFrame as ConnectDataFrame

File /opt/spark/python/pyspark/sql/connect/column.py:19
1 #
2 # Licensed to the Apache Software Foundation (ASF) under one or more
3 # contributor license agreements. See the NOTICE file distributed with
(...)
15 # limitations under the License.
16 #
17 from pyspark.sql.connect.utils import check_dependencies
---> 19 check_dependencies(name)
21 import datetime
22 import decimal

File /opt/spark/python/pyspark/sql/connect/utils.py:35, in check_dependencies(mod_name)
33 require_minimum_pandas_version()
34 require_minimum_pyarrow_version()
---> 35 require_minimum_grpc_version()

File /opt/spark/python/pyspark/sql/connect/utils.py:47, in require_minimum_grpc_version()
45 import grpc
46 except ImportError as error:
---> 47 raise ImportError(
48 "grpc >= %s must be installed; however, " "it was not found." % minimum_grpc_version
49 ) from error
50 if LooseVersion(grpc.version) < LooseVersion(minimum_grpc_version):
51 raise ImportError(
52 "gRPC >= %s must be installed; however, "
53 "your version was %s." % (minimum_grpc_version, grpc.version)
54 )

ImportError: grpc >= 1.48.1 must be installed; however, it was not found.

pip install grpc

Collecting grpc
Downloading grpc-1.0.0.tar.gz (5.2 kB)
Preparing metadata (setup.py) ... error
error: subprocess-exited-with-error

× python setup.py egg_info did not run successfully.
│ exit code: 1
╰─> [6 lines of output]
Traceback (most recent call last):
File "", line 2, in
File "", line 34, in
File "/tmp/pip-install-vp4d8s4c/grpc_c0f1992ad8f7456b8ac09ecbaeb81750/setup.py", line 33, in
raise RuntimeError(HINT)
RuntimeError: Please install the official package with: pip install grpcio
[end of output]

note: This error originates from a subprocess, and is likely not a problem with pip.
error: metadata-generation-failed

× Encountered error while generating package metadata.
╰─> See above for output.

note: This is an issue with the package mentioned above, not pip.
hint: See above for details.
Note: you may need to restart the kernel to use updated packages.

After
pip install grpcio
then it works.

I don't think that ever pandas users that try pandas API on spark will use spark connect. So can we change this back?

@itholic
Copy link
Contributor Author

itholic commented Apr 9, 2023

Thank you for the feedback, @bjornjorgensen !

IMHO, it seems more reasonable to add grpcio as a dependency for the Pandas API on Spark instead of reverting all this change back (Oh, btw seems like you already open a #40716 for fixing the existing issue 😄)

From my understanding, the purpose of Spark Connect is to allow users to use existing PySpark project without any code changes through a remote client. Therefore, if a user is using the pyspark.pandas module in their existing code, it should work the same way through the remote client as well.

So I think we should support all the functionality of PySpark as much as possible including pandas API on Spark, since nobody can not sure whether existing PySpark users will use the Pandas API on Spark through Spark Connect or not at this point, not only the existing pandas users.

Alternatively, we might be able to create completely separate package path for the Pandas API on Spark for Spark Connect. This would allow the existing Pandas API on Spark to be used without installing grpcio, but IMHO it would be much more overhead than simply changing the policy to add one package as an additional installation.

WDYT? also cc @HyukjinKwon @grundprinzip @ueshin @zhengruifeng FYI

@HyukjinKwon
Copy link
Member

The problem is that you don't use Spark Connect but it complains that it needs grpcio

@HyukjinKwon
Copy link
Member

For example, you can't just import from pyspark.sql.connect.column import Column as ConnectColumn at pyspark/pandas/_typing.py. Even you don't use Spark connect, it will check the dependency. Can we avoid this?

@itholic
Copy link
Contributor Author

itholic commented Apr 9, 2023

Got it. Then I think we need to modify the current code to import the ConnectColumn only when is_remote() is True across all code paths. For example:

Before

from pyspark.sql.column import Column as PySparkColumn
from pyspark.sql.connect.column import Column as ConnectColumn
from pyspark.sql.utils import is_remote

def example():
    Column = ConnectColumn if is_remote() else PySparkColumn
    return Column.__eq__

After

from pyspark.sql.column import Column as PySparkColumn
from pyspark.sql.utils import is_remote

def example():
    if is_remote():
        from pyspark.sql.connect.column import Column as ConnectColumn
        Column = ConnectColumn
    else:
        Column = PySparkColumn
    return Column.__eq__

Also, we should remove GenericColumn and GenericDataFrame.

Can you happen to think of a better solution? As of now, I haven't come up with a better way other than this.

Thanks!

@bjornjorgensen
Copy link
Contributor

"The problem is that you don't use Spark Connect but it complains that it needs grpcio"
Yes, this is correct :)
#40716 was for a typo so users of connect will know how to install the package.

The is_remote() can probably work.
Perhaps we can look at how pyspark did solve this?

@grundprinzip
Copy link
Contributor

We need to be careful in changing the imports too early or too late. The way that the is remote functionality works checks for an environment variable. If it's not set during the import setting it later yields undesired consequences.

Is it so had to add the dependency for grpc when using the pandas API?

What are we achieving with this? GRPC is a stable protocol and not a random library. It's available throughout all platforms.

What's the benefit of trying this pure approach?

@bjornjorgensen
Copy link
Contributor

One of the key thing with pandas API on spark is that users only have to change pd to ps in the import. But now they also have to install GRPC.

Does this PR introduce any user-facing change?
Yes, every pandas API on spark user need to install GRPC.

@grundprinzip
Copy link
Contributor

That's only partially true, they still have to install PySpark. If they pip install PySpark[connect] the dependencies are properly resolved.

@grundprinzip
Copy link
Contributor

Actually even today Pandas on Spark users have to install specific dependencies that are not part of the default PySpark installation.

See https://github.com/apache/spark/blob/master/python/setup.py

Now, the quickest fix is to simply add grpcio to the pandas on spark dependency list.

Given that we're forcing Pandas users into specific Java versions etc adding grpcio does not make it worse in my opinion.

The user surface remains the same.

@bjornjorgensen
Copy link
Contributor

Those modules that pandas Api on spark needs are pandas, pyarrow, numpy. Witch every pandas users already have installed.
And now they have to add connect, grpcio, grpcio-status and googleapis-common-protos.

Why does pandas API on spark need such high coupling on the connect module?

@grundprinzip
Copy link
Contributor

IIUC only grpcio is needed. The rest is localized to the spark connect client.

@HyukjinKwon
Copy link
Member

Is it so had to add the dependency for grpc when using the pandas API?

It's not super hard. But it's a bit odd to add this alone to pandas API on Spark. We should probably think about adding grpcio as a hard dependency for whole PySpark project, but definitely not alone for pandas API on Spark.

What are we achieving with this? GRPC is a stable protocol and not a random library. It's available throughout all platforms.
What's the benefit of trying this pure approach?

So for the current status, we're trying to add the dependencies that the module need so users won't need to install the unnecessary dependency. In addition, adding the dependency breaks existing applications when they migrate from 3.4 to 3.5. It matters when PySpark is installed without pip (which is actually the official release channel of Apache Spark).

@HyukjinKwon
Copy link
Member

So my proposal is to keep it consistent for now (that dose not require grpcio when we don't use Spark Connect). And separately discuss if we should switch them to a hard required dependency.

@itholic
Copy link
Contributor Author

itholic commented Apr 10, 2023

We should probably think about adding grpcio as a hard dependency for whole PySpark project, but definitely not alone for pandas API on Spark

I think this point is particularly crucial even though all of your opinions make sense to me.
To summarize, I will made a fix following the suggestion in #40525 (comment) so that the Pandas API on Spark can work without grpcio for now. Then, we can discuss adding the grpcio dependency to the entire PySpark project if needed in the future.
Thank you for all the feedback!

@@ -778,6 +778,68 @@ def __hash__(self):
# ml unittests
"pyspark.ml.tests.connect.test_connect_function",
"pyspark.ml.tests.connect.test_parity_torch_distributor",
# pandas-on-Spark unittests
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

pyspark-connect takes 2~3 hours after adding the PS related unittests, I guess we can add new modules pyspark-connect-pandas and pyspark-connect-pandas-slow @itholic

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let me address within follow-up PR. Thanks!

@itholic itholic deleted the initial_pandas_connect branch April 22, 2023 05:47
zhengruifeng added a commit that referenced this pull request Aug 2, 2023
### What changes were proposed in this pull request?
add missing UTs

### Why are the changes needed?
the two UTs were missing in #40525

### Does this PR introduce _any_ user-facing change?
no. test-only

### How was this patch tested?
updated CI

Closes #42262 from zhengruifeng/ps_test_ewm.

Authored-by: Ruifeng Zheng <[email protected]>
Signed-off-by: Ruifeng Zheng <[email protected]>
zhengruifeng added a commit that referenced this pull request Aug 2, 2023
### What changes were proposed in this pull request?
add missing UTs

### Why are the changes needed?
the two UTs were missing in #40525

### Does this PR introduce _any_ user-facing change?
no. test-only

### How was this patch tested?
updated CI

Closes #42262 from zhengruifeng/ps_test_ewm.

Authored-by: Ruifeng Zheng <[email protected]>
Signed-off-by: Ruifeng Zheng <[email protected]>
(cherry picked from commit 6891f2c)
Signed-off-by: Ruifeng Zheng <[email protected]>
ragnarok56 pushed a commit to ragnarok56/spark that referenced this pull request Mar 2, 2024
### What changes were proposed in this pull request?
add missing UTs

### Why are the changes needed?
the two UTs were missing in apache#40525

### Does this PR introduce _any_ user-facing change?
no. test-only

### How was this patch tested?
updated CI

Closes apache#42262 from zhengruifeng/ps_test_ewm.

Authored-by: Ruifeng Zheng <[email protected]>
Signed-off-by: Ruifeng Zheng <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

7 participants