From f49b9a23468f7af32cb53d2b654272757c151725 Mon Sep 17 00:00:00 2001 From: Greg Bowyer Date: Fri, 5 Aug 2016 17:53:30 -0700 Subject: [PATCH] [SPARK-16931][PYTHON] PySpark APIS for bucketBy and sortBy --- python/pyspark/sql/readwriter.py | 40 ++++++++++++++++++++++++++++++++ 1 file changed, 40 insertions(+) diff --git a/python/pyspark/sql/readwriter.py b/python/pyspark/sql/readwriter.py index 64de33e8ec0a8..050954e43b023 100644 --- a/python/pyspark/sql/readwriter.py +++ b/python/pyspark/sql/readwriter.py @@ -501,6 +501,46 @@ def partitionBy(self, *cols): self._jwrite = self._jwrite.partitionBy(_to_seq(self._spark._sc, cols)) return self + @since(2.1) + def bucketBy(self, numBuckets, *cols): + """Buckets the output by the given columns on the file system. + + :param numBuckets: the number of buckets to save + :param cols: name of columns + + >>> (df.write.format('parquet') + ... .bucketBy(100, 'year', 'month') + ... .saveAsTable(os.path.join(tempfile.mkdtemp(), 'bucketed_table'))) + """ + if len(cols) == 1 and isinstance(cols[0], (list, tuple)): + cols = cols[0] + + col = cols[0] + cols = cols[1:] + + self._jwrite = self._jwrite.bucketBy(numBuckets, col, _to_seq(self._spark._sc, cols)) + return self + + @since(2.1) + def sortBy(self, *cols): + """Sorts the output in each bucket by the given columns on the file system. + + :param cols: name of columns + + >>> (df.write.format('parquet') + ... .bucketBy(100, 'year', 'month') + ... .sortBy('day') + ... .saveAsTable(os.path.join(tempfile.mkdtemp(), 'sorted_bucketed_table'))) + """ + if len(cols) == 1 and isinstance(cols[0], (list, tuple)): + cols = cols[0] + + col = cols[0] + cols = cols[1:] + + self._jwrite = self._jwrite.sortBy(col, _to_seq(self._spark._sc, cols)) + return self + @since(1.4) def save(self, path=None, format=None, mode=None, partitionBy=None, **options): """Saves the contents of the :class:`DataFrame` to a data source.