-
Notifications
You must be signed in to change notification settings - Fork 28.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-16931][PYTHON][SQL] Add Python wrapper for bucketBy #17077
Conversation
python/pyspark/sql/readwriter.py
Outdated
@@ -545,6 +545,55 @@ def partitionBy(self, *cols): | |||
self._jwrite = self._jwrite.partitionBy(_to_seq(self._spark._sc, cols)) | |||
return self | |||
|
|||
@since(2.1) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe it should be 2.2 :)
python/pyspark/sql/tests.py
Outdated
# Test write with one bucketing column | ||
df.write.bucketBy(3, "x").mode("overwrite").saveAsTable("pyspark_bucket") | ||
self.assertEqual( | ||
len([c for c in self.spark.catalog.listColumns("pyspark_bucket") if c.name == "x" and c.isBucket]), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh, BTW, I assume it exceeds 100 length limit?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Indeed.
Test build #73504 has finished for PR 17077 at commit
|
Test build #73522 has finished for PR 17077 at commit
|
Test build #73523 has finished for PR 17077 at commit
|
Test build #73527 has finished for PR 17077 at commit
|
e2bad95
to
ae93166
Compare
Test build #73535 has finished for PR 17077 at commit
|
This looks like an important improvement that might make sense to try and get in for 2.2 so I'll try and get some reviewing in. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
First quick pass through, thanks for working on this :)
python/pyspark/sql/readwriter.py
Outdated
@@ -545,6 +545,57 @@ def partitionBy(self, *cols): | |||
self._jwrite = self._jwrite.partitionBy(_to_seq(self._spark._sc, cols)) | |||
return self | |||
|
|||
@since(2.2) | |||
def bucketBy(self, numBuckets, *cols): | |||
"""Buckets the output by the given columns on the file system. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So the bucketBy
description in the scaladoc is a bit more in depth, you might just want to copy that.
Also generally our style for multi-line doc string is to have the open """
on its own line.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Regarding style I had a similar exchange with @jkbradley lately (#17218 (review)). If a single convention is desired a believe it should be documented and the remaining docstrings should be adjusted. Personally I am indifferent thought PEP 8 and PEP 257 seem to prefer this convention over placing opening quotes in a separate line.
you might just want to copy that.
Do you mean this? I wonder if should rather document that it is allowed only with saveAsTable
. What do you think?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Both
"""
...
"""
or
"""...
"""
comply pep8 for multiple-line docstring up to my knowledge although I don't think a specific way has been preferred in this case. (Just as a personal taste, I prefer the first case).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think just copying it from Scala doc is good enough to prevent overhead of sweeping the documentation when we start to support other operations later.
python/pyspark/sql/readwriter.py
Outdated
|
||
@since(2.2) | ||
def sortBy(self, *cols): | ||
"""Sorts the output in each bucket by the given columns on the file system. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same comment as above with regards to the docstring
python/pyspark/sql/tests.py
Outdated
df.write.bucketBy(3, "x").mode("overwrite").saveAsTable("pyspark_bucket") | ||
self.assertEqual( | ||
len([c for c in self.spark.catalog.listColumns("pyspark_bucket") | ||
if c.name == "x" and c.isBucket]), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
BTW, maybe, we should break this into multiple lines (or simply another variable for this list comprehension) if more commits should be pushed. It seems not readable.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can simplify this to
catalog = self.spark.catalog
sum(c.name == "x" and c.isBucket for c in catalog.listColumns("pyspark_bucket"))
f you think this is more readable but i am not convinced that it makes sense to use a separate variable here. We have a few tests like this, don't care about the sequence itself, and I think it would only pollute the scope. But if you have strong feelings about I am happy to adjust it.
Regarding the comment style... Right now (excluding bucket
by and sortBy
) we have
-
23 docstrings with:
""".... """
-
7 docstrings:
"""" .... """"
in readwriter
. As you said both are valid, but if we want to keep only one convention it would be a good idea to adjust a whole module.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am fine with it. I dont strongly feel about both.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for taking a look for the related ones and trying it out.
Test build #75658 has finished for PR 17077 at commit
|
Test build #75659 has finished for PR 17077 at commit
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There are all from me @zero323. It looks quite good to me except for few comments I left.
python/pyspark/sql/readwriter.py
Outdated
@@ -545,6 +545,57 @@ def partitionBy(self, *cols): | |||
self._jwrite = self._jwrite.partitionBy(_to_seq(self._spark._sc, cols)) | |||
return self | |||
|
|||
@since(2.2) | |||
def bucketBy(self, numBuckets, *cols): | |||
"""Buckets the output by the given columns on the file system. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think just copying it from Scala doc is good enough to prevent overhead of sweeping the documentation when we start to support other operations later.
python/pyspark/sql/tests.py
Outdated
df.write.bucketBy(3, "x", "y").mode("overwrite").saveAsTable("pyspark_bucket") | ||
self.assertEqual( | ||
len([c for c in self.spark.catalog.listColumns("pyspark_bucket") | ||
if c.name in ("x", "y") and c.isBucket]), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am sorry. What do you think about something like this one below?:
cols = self.spark.catalog.listColumns("pyspark_bucket")
num = len([c for c in cols if c.name in ("x", "y") and c.isBucket])
self.assertEqual(num, 2)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If you think it is better I'll trust your judgment. But let's keep it DRY and use a helper.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Copying docs from Scala docs directly could be confusing since we won't support this in 2.0 and 2.1 and changes since 2.0 doesn't really affect us here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you for taking my opinion into account. Yea, we should remove or change the version. I meant to follow the rest of contents.
Generally, the contents in documentation has been matched among APIs in different languages up to my knowledge. I don't think this is a kind of a must but I think it is safer to avoid getting blamed for any reason in the future and confusion for the users.
I have seen several minor PRs fixing documentations (e.g., typos) that has to identically be fixed for other APIs in different language and I also made some PRs to match the documentations, e.g., #17429
# Test write with bucket and sort with multiple columns | ||
(df.write.bucketBy(2, "x") | ||
.sortBy("y", "z") | ||
.mode("overwrite").saveAsTable("pyspark_bucket")) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@zero323, should we drop the table before or after this test?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think that dropping before is necessary. We override on each write and name clashes are unlikely.
We can drop down after the tests but I am not sure how to do it right. SQLTests
is overgrown and I am not sure if we should add tearDown
only for this but adding DROP TABLE
in test itself doesn't look right.
Test build #75666 has finished for PR 17077 at commit
|
Test build #75667 has finished for PR 17077 at commit
|
python/pyspark/sql/tests.py
Outdated
.mode("overwrite").saveAsTable("pyspark_bucket")) | ||
self.assertSetEqual(set(data), set(self.spark.table("pyspark_bucket").collect())) | ||
|
||
self.spark.sql("DROP TABLE IF EXISTS pyspark_bucket") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yea, I think this is a correct way to drop the table.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we're going to drop the table here we should probably put it in a final block.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@holdenk Do you suggest adding tearDown
? I thought about it but right now tests are so inflated (sadly not much support for SPARK-19224) it will be completely detached from the context.
From the other hand adding artificial try ... finally
seems wrong.
(I think we need @holdenk's sign-off and further review.) |
Thanks for helping with the review @HyukjinKwon :) |
One minor comment but otherwise looking in very good shape. |
@holdenk, @HyukjinKwon Do we retarget this to 2.3? |
I think we should because branch-2.2 is cut out. |
🙁 |
Sure, but I'll need some guidance here. Somewhere in the Generic Load/Save Functions, right? But I guess we'll need a separate section for that. And should probably document |
Test build #76545 has finished for PR 17077 at commit
|
:param cols: additional names (optional). If `col` is a list it should be empty. | ||
|
||
.. note:: Applicable for file-based data sources in combination with | ||
:py:meth:`DataFrameWriter.saveAsTable`. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is not accurate. We also can use save
to store the bucked tables without saving its metadata in metastore.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@gatorsmile Can we?
➜ spark git:(master) git rev-parse HEAD
2cf83c47838115f71419ba5b9296c69ec1d746cd
➜ spark git:(master) bin/spark-shell
Spark context Web UI available at http://192.168.1.101:4041
Spark context available as 'sc' (master = local[*], app id = local-1494184109262).
Spark session available as 'spark'.
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/___/ .__/\_,_/_/ /_/\_\ version 2.3.0-SNAPSHOT
/_/
Using Scala version 2.11.8 (OpenJDK 64-Bit Server VM, Java 1.8.0_121)
Type in expressions to have them evaluated.
Type :help for more information.
scala> Seq(("a", 1, 3)).toDF("x", "y", "z").write.bucketBy(3, "x", "y").format("parquet").save("/tmp/foo")
org.apache.spark.sql.AnalysisException: 'save' does not support bucketing right now;
at org.apache.spark.sql.DataFrameWriter.assertNotBucketed(DataFrameWriter.scala:305)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:231)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:217)
... 48 elided
`
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
uh. Yes. Bucket info is not part of the file/directory names, unlike partitioning info.
Yes. You can create a new section to explain how to create a bucket tables. |
@@ -563,6 +563,63 @@ def partitionBy(self, *cols): | |||
self._jwrite = self._jwrite.partitionBy(_to_seq(self._spark._sc, cols)) | |||
return self | |||
|
|||
@since(2.3) | |||
def bucketBy(self, numBuckets, col, *cols): | |||
"""Buckets the output by the given columns.If specified, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: columns.If
-> columns. If
LGTM
|
The SQL document update can be a separate PR. Thanks for your work! |
LGTM too. |
thanks, merging to master! |
## What changes were proposed in this pull request? Adds Python wrappers for `DataFrameWriter.bucketBy` and `DataFrameWriter.sortBy` ([SPARK-16931](https://issues.apache.org/jira/browse/SPARK-16931)) ## How was this patch tested? Unit tests covering new feature. __Note__: Based on work of GregBowyer (f49b9a2) CC HyukjinKwon Author: zero323 <[email protected]> Author: Greg Bowyer <[email protected]> Closes apache#17077 from zero323/SPARK-16931.
What changes were proposed in this pull request?
Adds Python wrappers for
DataFrameWriter.bucketBy
andDataFrameWriter.sortBy
(SPARK-16931)How was this patch tested?
Unit tests covering new feature.
Note: Based on work of @GregBowyer (f49b9a2)
CC @HyukjinKwon