From 01fe01f309705af0670013508a40c2178166145b Mon Sep 17 00:00:00 2001 From: Zheng RuiFeng Date: Sun, 3 Apr 2016 16:08:32 +0800 Subject: [PATCH 01/23] create quantile4cols --- .../spark/sql/DataFrameStatFunctions.scala | 34 +++++++++++++++++++ 1 file changed, 34 insertions(+) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameStatFunctions.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameStatFunctions.scala index 89c3a74f4f067..59a40b3ead09d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameStatFunctions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameStatFunctions.scala @@ -74,6 +74,40 @@ final class DataFrameStatFunctions private[sql](df: DataFrame) { Seq(col), probabilities, relativeError).head.toArray } + /** + * Calculates the approximate quantiles of numerical columns of a DataFrame. + * + * The result of this algorithm has the following deterministic bound: + * If the DataFrame has N elements and if we request the quantile at probability `p` up to error + * `err`, then the algorithm will return a sample `x` from the DataFrame so that the *exact* rank + * of `x` is close to (p * N). + * More precisely, + * + * floor((p - err) * N) <= rank(x) <= ceil((p + err) * N). + * + * This method implements a variation of the Greenwald-Khanna algorithm (with some speed + * optimizations). + * The algorithm was first present in [[http://dx.doi.org/10.1145/375663.375670 Space-efficient + * Online Computation of Quantile Summaries]] by Greenwald and Khanna. + * + * @param cols the names of the numerical columns + * @param probabilities a list of quantile probabilities + * Each number must belong to [0, 1]. + * For example 0 is the minimum, 0.5 is the median, 1 is the maximum. + * @param relativeError The relative target precision to achieve (>= 0). + * If set to zero, the exact quantiles are computed, which could be very expensive. + * Note that values greater than 1 are accepted but give the same result as 1. + * @return the approximate quantiles at the given probabilities + * + * @since 2.0.0 + */ + def approxQuantile( + cols: Array[String], + probabilities: Array[Double], + relativeError: Double): Array[Double] = { + StatFunctions.multipleApproxQuantiles(df, cols, probabilities, relativeError).head.toArray + } + /** * Python-friendly version of [[approxQuantile()]] */ From 9cdb7cf86852c51f282491ee46376a6f2e2e4772 Mon Sep 17 00:00:00 2001 From: Zheng RuiFeng Date: Sun, 3 Apr 2016 16:11:04 +0800 Subject: [PATCH 02/23] fix --- .../org/apache/spark/sql/DataFrameStatFunctions.scala | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameStatFunctions.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameStatFunctions.scala index 59a40b3ead09d..c564f96005ada 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameStatFunctions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameStatFunctions.scala @@ -97,15 +97,16 @@ final class DataFrameStatFunctions private[sql](df: DataFrame) { * @param relativeError The relative target precision to achieve (>= 0). * If set to zero, the exact quantiles are computed, which could be very expensive. * Note that values greater than 1 are accepted but give the same result as 1. - * @return the approximate quantiles at the given probabilities + * @return the approximate quantiles at the given probabilities of each column * * @since 2.0.0 */ def approxQuantile( cols: Array[String], probabilities: Array[Double], - relativeError: Double): Array[Double] = { - StatFunctions.multipleApproxQuantiles(df, cols, probabilities, relativeError).head.toArray + relativeError: Double): Array[Array[Double]] = { + StatFunctions.multipleApproxQuantiles(df, cols, probabilities, relativeError) + .map(_.toArray).toArray } /** From 3dacbca8de99937d0068781bc3e4328fb232c851 Mon Sep 17 00:00:00 2001 From: Zheng RuiFeng Date: Wed, 6 Apr 2016 14:31:40 +0800 Subject: [PATCH 03/23] add python api --- python/pyspark/sql/dataframe.py | 17 +++++++++++++--- .../spark/sql/DataFrameStatFunctions.scala | 20 +++++++++++++++---- 2 files changed, 30 insertions(+), 7 deletions(-) diff --git a/python/pyspark/sql/dataframe.py b/python/pyspark/sql/dataframe.py index 10e42d0f9d322..00cc69032b791 100644 --- a/python/pyspark/sql/dataframe.py +++ b/python/pyspark/sql/dataframe.py @@ -1375,8 +1375,16 @@ def approxQuantile(self, col, probabilities, relativeError): accepted but give the same result as 1. :return: the approximate quantiles at the given probabilities """ - if not isinstance(col, str): - raise ValueError("col should be a string.") + if not isinstance(col, (str, list, tuple)): + raise ValueError("col should be a string or list or tuple.") + if isinstance(col, str): + col = [col] + if isinstance(col, tuple): + col = list(col) + for c in col: + if not isinstance(col, str): + raise ValueError("columns should be strings.") + col = _to_list(self._sc, col) if not isinstance(probabilities, (list, tuple)): raise ValueError("probabilities should be a list or tuple") @@ -1391,7 +1399,10 @@ def approxQuantile(self, col, probabilities, relativeError): raise ValueError("relativeError should be numerical (float, int, long) >= 0.") relativeError = float(relativeError) - jaq = self._jdf.stat().approxQuantile(col, probabilities, relativeError) + jaq = self._jdf.stat().approxQuantileMultiCols(col, probabilities, relativeError) + jaq = list(jaq) + if len(jaq) == 1: + return list(jaq[0]) return list(jaq) @since(1.4) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameStatFunctions.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameStatFunctions.scala index c564f96005ada..1e2892dc6b606 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameStatFunctions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameStatFunctions.scala @@ -74,6 +74,16 @@ final class DataFrameStatFunctions private[sql](df: DataFrame) { Seq(col), probabilities, relativeError).head.toArray } + /** + * Python-friendly version of [[approxQuantile()]] + */ + private[spark] def approxQuantile( + col: String, + probabilities: List[Double], + relativeError: Double): java.util.List[Double] = { + approxQuantile(col, probabilities.toArray, relativeError).toList.asJava + } + /** * Calculates the approximate quantiles of numerical columns of a DataFrame. * @@ -109,14 +119,16 @@ final class DataFrameStatFunctions private[sql](df: DataFrame) { .map(_.toArray).toArray } + /** * Python-friendly version of [[approxQuantile()]] */ - private[spark] def approxQuantile( - col: String, + private[spark] def approxQuantileMultiCols( + cols: List[String], probabilities: List[Double], - relativeError: Double): java.util.List[Double] = { - approxQuantile(col, probabilities.toArray, relativeError).toList.asJava + relativeError: Double): java.util.List[java.util.List[Double]] = { + approxQuantile(cols.toArray, probabilities.toArray, relativeError) + .map(_.toList.asJava).toList.asJava } /** From e2a340bbcd78b77f7c3f9ada5626e93dcd202fff Mon Sep 17 00:00:00 2001 From: Zheng RuiFeng Date: Wed, 6 Apr 2016 15:20:59 +0800 Subject: [PATCH 04/23] add python test --- python/pyspark/sql/dataframe.py | 15 +++++++++------ python/pyspark/sql/tests.py | 13 ++++++++++++- 2 files changed, 21 insertions(+), 7 deletions(-) diff --git a/python/pyspark/sql/dataframe.py b/python/pyspark/sql/dataframe.py index 00cc69032b791..1792735626fad 100644 --- a/python/pyspark/sql/dataframe.py +++ b/python/pyspark/sql/dataframe.py @@ -1365,7 +1365,8 @@ def approxQuantile(self, col, probabilities, relativeError): Space-efficient Online Computation of Quantile Summaries]] by Greenwald and Khanna. - :param col: the name of the numerical column + :param col: the name of the numerical column, or a list/tuple of + numerical columns. :param probabilities: a list of quantile probabilities Each number must belong to [0, 1]. For example 0 is the minimum, 0.5 is the median, 1 is the maximum. @@ -1399,11 +1400,13 @@ def approxQuantile(self, col, probabilities, relativeError): raise ValueError("relativeError should be numerical (float, int, long) >= 0.") relativeError = float(relativeError) - jaq = self._jdf.stat().approxQuantileMultiCols(col, probabilities, relativeError) - jaq = list(jaq) - if len(jaq) == 1: - return list(jaq[0]) - return list(jaq) + jaqs = self._jdf.stat().approxQuantileMultiCols(col, probabilities, relativeError) + res = [] + for jaq in jaqs: + res.append(list(jaq)) + if len(res) == 1: + return res[0] + return res @since(1.4) def corr(self, col1, col2, method=None): diff --git a/python/pyspark/sql/tests.py b/python/pyspark/sql/tests.py index a88e5a1cfb3c9..83634d49a8980 100644 --- a/python/pyspark/sql/tests.py +++ b/python/pyspark/sql/tests.py @@ -874,12 +874,23 @@ def test_first_last_ignorenulls(self): self.assertEqual([Row(a=None, b=1, c=None, d=98)], df3.collect()) def test_approxQuantile(self): - df = self.sc.parallelize([Row(a=i) for i in range(10)]).toDF() + df = self.sc.parallelize([Row(a=i, b=i+10) for i in range(10)]).toDF() aq = df.stat.approxQuantile("a", [0.1, 0.5, 0.9], 0.1) self.assertTrue(isinstance(aq, list)) self.assertEqual(len(aq), 3) self.assertTrue(all(isinstance(q, float) for q in aq)) + aqs = df.stat.approxQuantile(["a","b"], [0.1, 0.5, 0.9], 0.1) + self.assertTrue(isinstance(aqs, list)) + self.assertEqual(len(aqs), 2) + self.assertTrue(isinstance(aqs[0], list)) + self.assertEqual(len(aqs[0]), 3) + self.assertTrue(all(isinstance(q, float) for q in aqs[0])) + self.assertTrue(isinstance(aqs[1], list)) + self.assertEqual(len(aqs[1]), 3) + self.assertTrue(all(isinstance(q, float) for q in aqs[1])) + + def test_corr(self): import math df = self.sc.parallelize([Row(a=i, b=math.sqrt(i)) for i in range(10)]).toDF() From f0ab669f276eba338af679828ed12a91b0b1b15f Mon Sep 17 00:00:00 2001 From: Zheng RuiFeng Date: Wed, 6 Apr 2016 15:27:33 +0800 Subject: [PATCH 05/23] fix py style --- python/pyspark/sql/tests.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/python/pyspark/sql/tests.py b/python/pyspark/sql/tests.py index 83634d49a8980..e0c70d44572a3 100644 --- a/python/pyspark/sql/tests.py +++ b/python/pyspark/sql/tests.py @@ -879,8 +879,7 @@ def test_approxQuantile(self): self.assertTrue(isinstance(aq, list)) self.assertEqual(len(aq), 3) self.assertTrue(all(isinstance(q, float) for q in aq)) - - aqs = df.stat.approxQuantile(["a","b"], [0.1, 0.5, 0.9], 0.1) + aqs = df.stat.approxQuantile(["a", "b"], [0.1, 0.5, 0.9], 0.1) self.assertTrue(isinstance(aqs, list)) self.assertEqual(len(aqs), 2) self.assertTrue(isinstance(aqs[0], list)) @@ -890,7 +889,6 @@ def test_approxQuantile(self): self.assertEqual(len(aqs[1]), 3) self.assertTrue(all(isinstance(q, float) for q in aqs[1])) - def test_corr(self): import math df = self.sc.parallelize([Row(a=i, b=math.sqrt(i)) for i in range(10)]).toDF() From ea7dcddfd1b62547dbe16413e40ceb42a64e747e Mon Sep 17 00:00:00 2001 From: Zheng RuiFeng Date: Fri, 8 Apr 2016 11:18:50 +0800 Subject: [PATCH 06/23] fix py test bug --- python/pyspark/sql/dataframe.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/python/pyspark/sql/dataframe.py b/python/pyspark/sql/dataframe.py index 1792735626fad..6240dfe342625 100644 --- a/python/pyspark/sql/dataframe.py +++ b/python/pyspark/sql/dataframe.py @@ -1378,12 +1378,14 @@ def approxQuantile(self, col, probabilities, relativeError): """ if not isinstance(col, (str, list, tuple)): raise ValueError("col should be a string or list or tuple.") + isStr = False if isinstance(col, str): + isStr = True col = [col] if isinstance(col, tuple): col = list(col) for c in col: - if not isinstance(col, str): + if not isinstance(c, str): raise ValueError("columns should be strings.") col = _to_list(self._sc, col) @@ -1404,7 +1406,7 @@ def approxQuantile(self, col, probabilities, relativeError): res = [] for jaq in jaqs: res.append(list(jaq)) - if len(res) == 1: + if isStr: return res[0] return res From b7c787fe6a0bf24cde5713ea4c63d5a4066c4dd9 Mon Sep 17 00:00:00 2001 From: Zheng RuiFeng Date: Fri, 8 Apr 2016 14:15:12 +0800 Subject: [PATCH 07/23] update doc --- python/pyspark/sql/dataframe.py | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/python/pyspark/sql/dataframe.py b/python/pyspark/sql/dataframe.py index 6240dfe342625..88f9b58f3d756 100644 --- a/python/pyspark/sql/dataframe.py +++ b/python/pyspark/sql/dataframe.py @@ -1348,7 +1348,7 @@ def replace(self, to_replace, value, subset=None): @since(2.0) def approxQuantile(self, col, probabilities, relativeError): """ - Calculates the approximate quantiles of a numerical column of a + Calculates the approximate quantiles of numerical columns of a DataFrame. The result of this algorithm has the following deterministic bound: @@ -1374,7 +1374,11 @@ def approxQuantile(self, col, probabilities, relativeError): (>= 0). If set to zero, the exact quantiles are computed, which could be very expensive. Note that values greater than 1 are accepted but give the same result as 1. - :return: the approximate quantiles at the given probabilities + :return: the approximate quantiles at the given probabilities. If + the input `col` is a string, the output is a list of float. If the + input `col` is a list or tuple of strings, the output is also a + list, but each element in it is a list of float (list of list of + float). """ if not isinstance(col, (str, list, tuple)): raise ValueError("col should be a string or list or tuple.") @@ -1403,9 +1407,7 @@ def approxQuantile(self, col, probabilities, relativeError): relativeError = float(relativeError) jaqs = self._jdf.stat().approxQuantileMultiCols(col, probabilities, relativeError) - res = [] - for jaq in jaqs: - res.append(list(jaq)) + res = [list(jaq) for jaq in jaqs] if isStr: return res[0] return res From 355b97951f2ba180e33b530f3dff20252b981982 Mon Sep 17 00:00:00 2001 From: Zheng RuiFeng Date: Mon, 18 Apr 2016 19:15:16 +0800 Subject: [PATCH 08/23] update api, add test --- python/pyspark/sql/dataframe.py | 26 ++++++++++--------- .../spark/sql/DataFrameStatFunctions.scala | 2 +- .../apache/spark/sql/DataFrameStatSuite.scala | 9 +++++++ 3 files changed, 24 insertions(+), 13 deletions(-) diff --git a/python/pyspark/sql/dataframe.py b/python/pyspark/sql/dataframe.py index 88f9b58f3d756..cb3e1dd360f35 100644 --- a/python/pyspark/sql/dataframe.py +++ b/python/pyspark/sql/dataframe.py @@ -1377,21 +1377,23 @@ def approxQuantile(self, col, probabilities, relativeError): :return: the approximate quantiles at the given probabilities. If the input `col` is a string, the output is a list of float. If the input `col` is a list or tuple of strings, the output is also a - list, but each element in it is a list of float (list of list of - float). + list, but each element in it is a list of float, i.e., the output + is a list of list of float. """ if not isinstance(col, (str, list, tuple)): - raise ValueError("col should be a string or list or tuple.") + raise ValueError("col should be a string, list or tuple.") + isStr = False if isinstance(col, str): isStr = True - col = [col] + if isinstance(col, tuple): col = list(col) - for c in col: - if not isinstance(c, str): - raise ValueError("columns should be strings.") - col = _to_list(self._sc, col) + if isinstance(col, list): + for c in col: + if not isinstance(c, str): + raise ValueError("columns should be strings.") + col = _to_list(self._sc, col) if not isinstance(probabilities, (list, tuple)): raise ValueError("probabilities should be a list or tuple") @@ -1406,11 +1408,11 @@ def approxQuantile(self, col, probabilities, relativeError): raise ValueError("relativeError should be numerical (float, int, long) >= 0.") relativeError = float(relativeError) - jaqs = self._jdf.stat().approxQuantileMultiCols(col, probabilities, relativeError) - res = [list(jaq) for jaq in jaqs] + jaq = self._jdf.stat().approxQuantile(col, probabilities, relativeError) if isStr: - return res[0] - return res + return list(jaq) + else: + return [list(j) for j in jaq] @since(1.4) def corr(self, col1, col2, method=None): diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameStatFunctions.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameStatFunctions.scala index 1e2892dc6b606..c698f27b4133a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameStatFunctions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameStatFunctions.scala @@ -123,7 +123,7 @@ final class DataFrameStatFunctions private[sql](df: DataFrame) { /** * Python-friendly version of [[approxQuantile()]] */ - private[spark] def approxQuantileMultiCols( + private[spark] def approxQuantile( cols: List[String], probabilities: List[Double], relativeError: Double): java.util.List[java.util.List[Double]] = { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameStatSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameStatSuite.scala index 1383208874a19..b1c12a177af14 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameStatSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameStatSuite.scala @@ -149,6 +149,15 @@ class DataFrameStatSuite extends QueryTest with SharedSQLContext { assert(math.abs(s2 - q2 * n) < error_single) assert(math.abs(d1 - 2 * q1 * n) < error_double) assert(math.abs(d2 - 2 * q2 * n) < error_double) + + // Multiple columns + val Array(Array(ms1, ms2), Array(md1, md2)) = + df.stat.approxQuantile(Array("singles", "doubles"), Array(q1, q2), epsilon) + + assert(math.abs(ms1 - q1 * n) < error_single) + assert(math.abs(ms2 - q2 * n) < error_single) + assert(math.abs(md1 - 2 * q1 * n) < error_double) + assert(math.abs(md2 - 2 * q2 * n) < error_double) } // test approxQuantile on NaN values val dfNaN = Seq(Double.NaN, 1.0, Double.NaN, Double.NaN).toDF("input") From bb3d751207e2682142d7fd7ce16fc63703bba0b3 Mon Sep 17 00:00:00 2001 From: Zheng RuiFeng Date: Wed, 20 Apr 2016 23:08:23 +0800 Subject: [PATCH 09/23] fix one nit --- python/pyspark/sql/dataframe.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/python/pyspark/sql/dataframe.py b/python/pyspark/sql/dataframe.py index cb3e1dd360f35..c92b3aa74e22a 100644 --- a/python/pyspark/sql/dataframe.py +++ b/python/pyspark/sql/dataframe.py @@ -1383,9 +1383,7 @@ def approxQuantile(self, col, probabilities, relativeError): if not isinstance(col, (str, list, tuple)): raise ValueError("col should be a string, list or tuple.") - isStr = False - if isinstance(col, str): - isStr = True + isStr = isinstance(col, str) if isinstance(col, tuple): col = list(col) From 31f384197e91cd33db5963a435432b2b36f7f2a8 Mon Sep 17 00:00:00 2001 From: Zheng RuiFeng Date: Thu, 4 Aug 2016 12:52:36 +0800 Subject: [PATCH 10/23] update version --- .../scala/org/apache/spark/sql/DataFrameStatFunctions.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameStatFunctions.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameStatFunctions.scala index c698f27b4133a..4a2fa9b60dcfe 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameStatFunctions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameStatFunctions.scala @@ -109,7 +109,7 @@ final class DataFrameStatFunctions private[sql](df: DataFrame) { * Note that values greater than 1 are accepted but give the same result as 1. * @return the approximate quantiles at the given probabilities of each column * - * @since 2.0.0 + * @since 2.1.0 */ def approxQuantile( cols: Array[String], From 5321237a1e8e9b9290b12b9caf5a8544398b1b55 Mon Sep 17 00:00:00 2001 From: Zheng RuiFeng Date: Wed, 5 Oct 2016 11:05:38 +0800 Subject: [PATCH 11/23] update doc --- python/pyspark/sql/dataframe.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/python/pyspark/sql/dataframe.py b/python/pyspark/sql/dataframe.py index c92b3aa74e22a..89f7e7bfe7e19 100644 --- a/python/pyspark/sql/dataframe.py +++ b/python/pyspark/sql/dataframe.py @@ -1365,8 +1365,8 @@ def approxQuantile(self, col, probabilities, relativeError): Space-efficient Online Computation of Quantile Summaries]] by Greenwald and Khanna. - :param col: the name of the numerical column, or a list/tuple of - numerical columns. + :param col: str, list. + Can be a single column name, or a list of names for multiple columns. :param probabilities: a list of quantile probabilities Each number must belong to [0, 1]. For example 0 is the minimum, 0.5 is the median, 1 is the maximum. From d858e2f30d823b4a7d866897615542d5e5b3f837 Mon Sep 17 00:00:00 2001 From: Zheng RuiFeng Date: Wed, 5 Oct 2016 11:21:01 +0800 Subject: [PATCH 12/23] del unused private api --- .../org/apache/spark/sql/DataFrameStatFunctions.scala | 10 ---------- 1 file changed, 10 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameStatFunctions.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameStatFunctions.scala index 4a2fa9b60dcfe..e9e79d3765fc1 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameStatFunctions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameStatFunctions.scala @@ -74,16 +74,6 @@ final class DataFrameStatFunctions private[sql](df: DataFrame) { Seq(col), probabilities, relativeError).head.toArray } - /** - * Python-friendly version of [[approxQuantile()]] - */ - private[spark] def approxQuantile( - col: String, - probabilities: List[Double], - relativeError: Double): java.util.List[Double] = { - approxQuantile(col, probabilities.toArray, relativeError).toList.asJava - } - /** * Calculates the approximate quantiles of numerical columns of a DataFrame. * From 4a5404efbcc2593041b303886e8cdeb0a74a5730 Mon Sep 17 00:00:00 2001 From: Zheng RuiFeng Date: Thu, 6 Oct 2016 09:11:23 +0800 Subject: [PATCH 13/23] add versionchanged --- python/pyspark/sql/dataframe.py | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/python/pyspark/sql/dataframe.py b/python/pyspark/sql/dataframe.py index 89f7e7bfe7e19..4da090f7e3cd0 100644 --- a/python/pyspark/sql/dataframe.py +++ b/python/pyspark/sql/dataframe.py @@ -1379,7 +1379,13 @@ def approxQuantile(self, col, probabilities, relativeError): input `col` is a list or tuple of strings, the output is also a list, but each element in it is a list of float, i.e., the output is a list of list of float. + + .. versionchanged:: 2.1 + Added support for multiple columns. """ + + + if not isinstance(col, (str, list, tuple)): raise ValueError("col should be a string, list or tuple.") From c4dc404f7a5b3f340c51ba1e6b095283d85d5f1a Mon Sep 17 00:00:00 2001 From: Zheng RuiFeng Date: Thu, 6 Oct 2016 09:11:36 +0800 Subject: [PATCH 14/23] add versionchanged --- python/pyspark/sql/dataframe.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/python/pyspark/sql/dataframe.py b/python/pyspark/sql/dataframe.py index 4da090f7e3cd0..425e11d3416f1 100644 --- a/python/pyspark/sql/dataframe.py +++ b/python/pyspark/sql/dataframe.py @@ -1384,8 +1384,6 @@ def approxQuantile(self, col, probabilities, relativeError): Added support for multiple columns. """ - - if not isinstance(col, (str, list, tuple)): raise ValueError("col should be a string, list or tuple.") From 191b54a3b7d70e1caae4f7aaa33ac0bf430af2fb Mon Sep 17 00:00:00 2001 From: Zheng RuiFeng Date: Thu, 6 Oct 2016 16:01:25 +0800 Subject: [PATCH 15/23] fix bug --- python/pyspark/sql/dataframe.py | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/python/pyspark/sql/dataframe.py b/python/pyspark/sql/dataframe.py index 425e11d3416f1..e9faed1d090a8 100644 --- a/python/pyspark/sql/dataframe.py +++ b/python/pyspark/sql/dataframe.py @@ -1391,11 +1391,13 @@ def approxQuantile(self, col, probabilities, relativeError): if isinstance(col, tuple): col = list(col) - if isinstance(col, list): - for c in col: - if not isinstance(c, str): - raise ValueError("columns should be strings.") - col = _to_list(self._sc, col) + elif isinstance(col, str): + col = [col] + + for c in col: + if not isinstance(c, str): + raise ValueError("columns should be strings.") + col = _to_list(self._sc, col) if not isinstance(probabilities, (list, tuple)): raise ValueError("probabilities should be a list or tuple") @@ -1412,7 +1414,7 @@ def approxQuantile(self, col, probabilities, relativeError): jaq = self._jdf.stat().approxQuantile(col, probabilities, relativeError) if isStr: - return list(jaq) + return [list(j) for j in jaq][0] else: return [list(j) for j in jaq] From b1f318de2511e9a3ba4c4ea7bbe7f372e2890aff Mon Sep 17 00:00:00 2001 From: Zheng RuiFeng Date: Fri, 4 Nov 2016 20:54:22 +0800 Subject: [PATCH 16/23] del duplicate doc; deal nan --- python/pyspark/sql/dataframe.py | 8 ++++---- .../spark/sql/DataFrameStatFunctions.scala | 19 ++++--------------- 2 files changed, 8 insertions(+), 19 deletions(-) diff --git a/python/pyspark/sql/dataframe.py b/python/pyspark/sql/dataframe.py index e9faed1d090a8..f684de7041b87 100644 --- a/python/pyspark/sql/dataframe.py +++ b/python/pyspark/sql/dataframe.py @@ -1365,6 +1365,8 @@ def approxQuantile(self, col, probabilities, relativeError): Space-efficient Online Computation of Quantile Summaries]] by Greenwald and Khanna. + Note that rows containing any null values will be removed before calculation. + :param col: str, list. Can be a single column name, or a list of names for multiple columns. :param probabilities: a list of quantile probabilities @@ -1413,10 +1415,8 @@ def approxQuantile(self, col, probabilities, relativeError): relativeError = float(relativeError) jaq = self._jdf.stat().approxQuantile(col, probabilities, relativeError) - if isStr: - return [list(j) for j in jaq][0] - else: - return [list(j) for j in jaq] + jaq_list = [list(j) for j in jaq] + return jaq_list[0] if isStr else jaq_list @since(1.4) def corr(self, col1, col2, method=None): diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameStatFunctions.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameStatFunctions.scala index e9e79d3765fc1..5094cd1cbe0d2 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameStatFunctions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameStatFunctions.scala @@ -24,6 +24,7 @@ import scala.collection.JavaConverters._ import org.apache.spark.annotation.InterfaceStability import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.execution.stat._ +import org.apache.spark.sql.functions.col import org.apache.spark.sql.types._ import org.apache.spark.util.sketch.{BloomFilter, CountMinSketch} @@ -77,19 +78,7 @@ final class DataFrameStatFunctions private[sql](df: DataFrame) { /** * Calculates the approximate quantiles of numerical columns of a DataFrame. * - * The result of this algorithm has the following deterministic bound: - * If the DataFrame has N elements and if we request the quantile at probability `p` up to error - * `err`, then the algorithm will return a sample `x` from the DataFrame so that the *exact* rank - * of `x` is close to (p * N). - * More precisely, - * - * floor((p - err) * N) <= rank(x) <= ceil((p + err) * N). - * - * This method implements a variation of the Greenwald-Khanna algorithm (with some speed - * optimizations). - * The algorithm was first present in [[http://dx.doi.org/10.1145/375663.375670 Space-efficient - * Online Computation of Quantile Summaries]] by Greenwald and Khanna. - * + * Note that rows containing any null values will be removed before calculation. * @param cols the names of the numerical columns * @param probabilities a list of quantile probabilities * Each number must belong to [0, 1]. @@ -105,8 +94,8 @@ final class DataFrameStatFunctions private[sql](df: DataFrame) { cols: Array[String], probabilities: Array[Double], relativeError: Double): Array[Array[Double]] = { - StatFunctions.multipleApproxQuantiles(df, cols, probabilities, relativeError) - .map(_.toArray).toArray + StatFunctions.multipleApproxQuantiles(df.select(cols.map(col): _*).na.drop(), cols, + probabilities, relativeError).map(_.toArray).toArray } From 6ad7a26697197c3ec9df136cafe7d77b7bce82bd Mon Sep 17 00:00:00 2001 From: Zheng RuiFeng Date: Sat, 5 Nov 2016 12:39:57 +0800 Subject: [PATCH 17/23] add refer; add multi nan test --- .../scala/org/apache/spark/sql/DataFrameStatFunctions.scala | 5 ++++- .../scala/org/apache/spark/sql/DataFrameStatSuite.scala | 6 ++++++ 2 files changed, 10 insertions(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameStatFunctions.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameStatFunctions.scala index 5094cd1cbe0d2..7879a437daafe 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameStatFunctions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameStatFunctions.scala @@ -77,8 +77,11 @@ final class DataFrameStatFunctions private[sql](df: DataFrame) { /** * Calculates the approximate quantiles of numerical columns of a DataFrame. + * @see [[DataFrameStatsFunctions.approxQuantile(col:Str* approxQuantile]] for + * detailed description. * - * Note that rows containing any null values will be removed before calculation. + * Note that rows containing any null or NaN values values will be removed before + * calculation. * @param cols the names of the numerical columns * @param probabilities a list of quantile probabilities * Each number must belong to [0, 1]. diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameStatSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameStatSuite.scala index b1c12a177af14..f52b18e27b5f7 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameStatSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameStatSuite.scala @@ -163,6 +163,12 @@ class DataFrameStatSuite extends QueryTest with SharedSQLContext { val dfNaN = Seq(Double.NaN, 1.0, Double.NaN, Double.NaN).toDF("input") val resNaN = dfNaN.stat.approxQuantile("input", Array(q1, q2), epsilons.head) assert(resNaN.count(_.isNaN) === 0) + // test approxQuantile on multi-column NaN values + val dfNaN2 = Seq((Double.NaN, 1.0), (1.0, 1.0), (-1.0, Double.NaN), (Double.NaN, Double.NaN)) + .toDF("input1", "input2") + val resNaN2 = dfNaN2.stat.approxQuantile(Array("input1", "input2"), + Array(q1, q2), epsilons.head) + assert(resNaN2.flatten.count(_.isNaN) === 0) } test("crosstab") { From 6d0c915fee16565e698875c11951ae16139c8926 Mon Sep 17 00:00:00 2001 From: Zheng RuiFeng Date: Mon, 21 Nov 2016 17:08:26 +0800 Subject: [PATCH 18/23] fix_conflict --- .../scala/org/apache/spark/sql/DataFrameStatFunctions.scala | 3 +++ 1 file changed, 3 insertions(+) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameStatFunctions.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameStatFunctions.scala index 7879a437daafe..871894be6f85a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameStatFunctions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameStatFunctions.scala @@ -91,6 +91,9 @@ final class DataFrameStatFunctions private[sql](df: DataFrame) { * Note that values greater than 1 are accepted but give the same result as 1. * @return the approximate quantiles at the given probabilities of each column * + * @note Rows containing any NaN values will be removed from the numerical column + * before calculation + * * @since 2.1.0 */ def approxQuantile( From c68ad7851f99cddc73fe677b5abd42dadd7965a9 Mon Sep 17 00:00:00 2001 From: Zheng RuiFeng Date: Mon, 21 Nov 2016 17:09:07 +0800 Subject: [PATCH 19/23] fix_conflict --- .../scala/org/apache/spark/sql/DataFrameStatFunctions.scala | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameStatFunctions.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameStatFunctions.scala index 871894be6f85a..47b8b820d8e73 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameStatFunctions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameStatFunctions.scala @@ -91,8 +91,7 @@ final class DataFrameStatFunctions private[sql](df: DataFrame) { * Note that values greater than 1 are accepted but give the same result as 1. * @return the approximate quantiles at the given probabilities of each column * - * @note Rows containing any NaN values will be removed from the numerical column - * before calculation + * @note Rows containing any NaN values will be removed before calculation * * @since 2.1.0 */ From 0542b42f9395c425d31c124fbf5676a5e6af4eeb Mon Sep 17 00:00:00 2001 From: Zheng RuiFeng Date: Tue, 3 Jan 2017 15:31:39 +0800 Subject: [PATCH 20/23] update version --- python/pyspark/sql/dataframe.py | 2 +- .../scala/org/apache/spark/sql/DataFrameStatFunctions.scala | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/python/pyspark/sql/dataframe.py b/python/pyspark/sql/dataframe.py index f684de7041b87..9d5f5548110fb 100644 --- a/python/pyspark/sql/dataframe.py +++ b/python/pyspark/sql/dataframe.py @@ -1382,7 +1382,7 @@ def approxQuantile(self, col, probabilities, relativeError): list, but each element in it is a list of float, i.e., the output is a list of list of float. - .. versionchanged:: 2.1 + .. versionchanged:: 2.2 Added support for multiple columns. """ diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameStatFunctions.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameStatFunctions.scala index 47b8b820d8e73..42b1ee47d9c90 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameStatFunctions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameStatFunctions.scala @@ -93,7 +93,7 @@ final class DataFrameStatFunctions private[sql](df: DataFrame) { * * @note Rows containing any NaN values will be removed before calculation * - * @since 2.1.0 + * @since 2.2.0 */ def approxQuantile( cols: Array[String], From 7bb3e40af5a858d143d0553862b8f577fef23ca4 Mon Sep 17 00:00:00 2001 From: Zheng RuiFeng Date: Thu, 5 Jan 2017 13:00:35 +0800 Subject: [PATCH 21/23] state_incorrect_type --- python/pyspark/sql/dataframe.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/python/pyspark/sql/dataframe.py b/python/pyspark/sql/dataframe.py index 9d5f5548110fb..5902586c6d649 100644 --- a/python/pyspark/sql/dataframe.py +++ b/python/pyspark/sql/dataframe.py @@ -16,7 +16,6 @@ # import sys -import warnings import random if sys.version >= '3': @@ -1387,7 +1386,7 @@ def approxQuantile(self, col, probabilities, relativeError): """ if not isinstance(col, (str, list, tuple)): - raise ValueError("col should be a string, list or tuple.") + raise ValueError("col should be a string, list or tuple, but got %r" % type(col)) isStr = isinstance(col, str) @@ -1398,7 +1397,7 @@ def approxQuantile(self, col, probabilities, relativeError): for c in col: if not isinstance(c, str): - raise ValueError("columns should be strings.") + raise ValueError("columns should be strings, but got %r" % type(c)) col = _to_list(self._sc, col) if not isinstance(probabilities, (list, tuple)): From 29a691f85bf9c94eedce9fe8c98dc9d3a83d01ea Mon Sep 17 00:00:00 2001 From: Zheng RuiFeng Date: Fri, 27 Jan 2017 17:18:48 +0800 Subject: [PATCH 22/23] update py tests --- python/pyspark/sql/tests.py | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/python/pyspark/sql/tests.py b/python/pyspark/sql/tests.py index e0c70d44572a3..0b12e2914e2c0 100644 --- a/python/pyspark/sql/tests.py +++ b/python/pyspark/sql/tests.py @@ -888,6 +888,18 @@ def test_approxQuantile(self): self.assertTrue(isinstance(aqs[1], list)) self.assertEqual(len(aqs[1]), 3) self.assertTrue(all(isinstance(q, float) for q in aqs[1])) + aqt = df.stat.approxQuantile(("a", "b"), [0.1, 0.5, 0.9], 0.1) + self.assertTrue(isinstance(aqt, list)) + self.assertEqual(len(aqt), 2) + self.assertTrue(isinstance(aqt[0], list)) + self.assertEqual(len(aqt[0]), 3) + self.assertTrue(all(isinstance(q, float) for q in aqt[0])) + self.assertTrue(isinstance(aqt[1], list)) + self.assertEqual(len(aqt[1]), 3) + self.assertTrue(all(isinstance(q, float) for q in aqt[1])) + self.assertRaises(ValueError, lambda: df.stat.approxQuantile(123, [0.1, 0.9], 0.1)) + self.assertRaises(ValueError, lambda: df.stat.approxQuantile(("a", 123), [0.1, 0.9], 0.1)) + self.assertRaises(ValueError, lambda: df.stat.approxQuantile(["a", 123], [0.1, 0.9], 0.1)) def test_corr(self): import math From ccf4d8d078e733fb0a2346686e851fee95a9d582 Mon Sep 17 00:00:00 2001 From: Ruifeng Zheng Date: Wed, 1 Feb 2017 14:42:22 +0800 Subject: [PATCH 23/23] float->floats --- python/pyspark/sql/dataframe.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/python/pyspark/sql/dataframe.py b/python/pyspark/sql/dataframe.py index 5902586c6d649..50373b8585195 100644 --- a/python/pyspark/sql/dataframe.py +++ b/python/pyspark/sql/dataframe.py @@ -1376,10 +1376,10 @@ def approxQuantile(self, col, probabilities, relativeError): could be very expensive. Note that values greater than 1 are accepted but give the same result as 1. :return: the approximate quantiles at the given probabilities. If - the input `col` is a string, the output is a list of float. If the + the input `col` is a string, the output is a list of floats. If the input `col` is a list or tuple of strings, the output is also a - list, but each element in it is a list of float, i.e., the output - is a list of list of float. + list, but each element in it is a list of floats, i.e., the output + is a list of list of floats. .. versionchanged:: 2.2 Added support for multiple columns.