Skip to content

Commit

Permalink
[SPARK-16931][PYTHON] PySpark APIS for bucketBy and sortBy
Browse files Browse the repository at this point in the history
  • Loading branch information
GregBowyer committed Aug 22, 2016
1 parent 929cb8b commit f49b9a2
Showing 1 changed file with 40 additions and 0 deletions.
40 changes: 40 additions & 0 deletions python/pyspark/sql/readwriter.py
Original file line number Diff line number Diff line change
Expand Up @@ -501,6 +501,46 @@ def partitionBy(self, *cols):
self._jwrite = self._jwrite.partitionBy(_to_seq(self._spark._sc, cols))
return self

@since(2.1)
def bucketBy(self, numBuckets, *cols):
"""Buckets the output by the given columns on the file system.
:param numBuckets: the number of buckets to save
:param cols: name of columns
>>> (df.write.format('parquet')
... .bucketBy(100, 'year', 'month')
... .saveAsTable(os.path.join(tempfile.mkdtemp(), 'bucketed_table')))
"""
if len(cols) == 1 and isinstance(cols[0], (list, tuple)):
cols = cols[0]

col = cols[0]
cols = cols[1:]

self._jwrite = self._jwrite.bucketBy(numBuckets, col, _to_seq(self._spark._sc, cols))
return self

@since(2.1)
def sortBy(self, *cols):
"""Sorts the output in each bucket by the given columns on the file system.
:param cols: name of columns
>>> (df.write.format('parquet')
... .bucketBy(100, 'year', 'month')
... .sortBy('day')
... .saveAsTable(os.path.join(tempfile.mkdtemp(), 'sorted_bucketed_table')))
"""
if len(cols) == 1 and isinstance(cols[0], (list, tuple)):
cols = cols[0]

col = cols[0]
cols = cols[1:]

self._jwrite = self._jwrite.sortBy(col, _to_seq(self._spark._sc, cols))
return self

@since(1.4)
def save(self, path=None, format=None, mode=None, partitionBy=None, **options):
"""Saves the contents of the :class:`DataFrame` to a data source.
Expand Down

0 comments on commit f49b9a2

Please sign in to comment.