From 18253947397608f0f46a66a630f96b5ee8ce93c0 Mon Sep 17 00:00:00 2001 From: Tim Saucer Date: Wed, 4 Sep 2024 08:13:32 -0400 Subject: [PATCH 01/35] Add NullTreatment enum wrapper and add filter option to approx_distinct --- python/datafusion/common.py | 15 +++++- python/datafusion/expr.py | 4 +- python/datafusion/functions.py | 50 ++++++++++++------- python/datafusion/tests/test_aggregation.py | 7 +++ .../datafusion/tests/test_wrapper_coverage.py | 6 +++ src/functions.rs | 6 ++- 6 files changed, 66 insertions(+), 22 deletions(-) diff --git a/python/datafusion/common.py b/python/datafusion/common.py index 225e3330..7db8333f 100644 --- a/python/datafusion/common.py +++ b/python/datafusion/common.py @@ -17,13 +17,13 @@ """Common data types used throughout the DataFusion project.""" from ._internal import common as common_internal +from enum import Enum # TODO these should all have proper wrapper classes DFSchema = common_internal.DFSchema DataType = common_internal.DataType DataTypeMap = common_internal.DataTypeMap -NullTreatment = common_internal.NullTreatment PythonType = common_internal.PythonType RexType = common_internal.RexType SqlFunction = common_internal.SqlFunction @@ -47,3 +47,16 @@ "SqlStatistics", "SqlFunction", ] + + +class NullTreatment(Enum): + """Describe how null values are to be treated by functions. + + This is used primarily by aggregate and window functions. It can be set on + these functions using the builder approach described in + ref:`_window_functions` and ref:`_aggregation` in the online documentation. + + """ + + RESPECT_NULLS = common_internal.NullTreatment.RESPECT_NULLS + IGNORE_NULLS = common_internal.NullTreatment.IGNORE_NULLS diff --git a/python/datafusion/expr.py b/python/datafusion/expr.py index 7fa60803..bd6a86fb 100644 --- a/python/datafusion/expr.py +++ b/python/datafusion/expr.py @@ -473,7 +473,7 @@ def null_treatment(self, null_treatment: NullTreatment) -> ExprFuncBuilder: set parameters for either window or aggregate functions. If used on any other type of expression, an error will be generated when ``build()`` is called. """ - return ExprFuncBuilder(self.expr.null_treatment(null_treatment)) + return ExprFuncBuilder(self.expr.null_treatment(null_treatment.value)) def partition_by(self, *partition_by: Expr) -> ExprFuncBuilder: """Set the partitioning for a window function. @@ -518,7 +518,7 @@ def distinct(self) -> ExprFuncBuilder: def null_treatment(self, null_treatment: NullTreatment) -> ExprFuncBuilder: """Set how nulls are treated for either window or aggregate functions.""" - return ExprFuncBuilder(self.builder.null_treatment(null_treatment)) + return ExprFuncBuilder(self.builder.null_treatment(null_treatment.value)) def partition_by(self, *partition_by: Expr) -> ExprFuncBuilder: """Set partitioning for window functions.""" diff --git a/python/datafusion/functions.py b/python/datafusion/functions.py index 97b4fe1d..a2620edc 100644 --- a/python/datafusion/functions.py +++ b/python/datafusion/functions.py @@ -18,9 +18,10 @@ from __future__ import annotations -from datafusion._internal import functions as f, common +from datafusion._internal import functions as f, expr as expr_internal from datafusion.expr import CaseBuilder, Expr, WindowFrame from datafusion.context import SessionContext +from datafusion.common import NullTreatment from typing import Any, Optional @@ -258,6 +259,12 @@ ] +def expr_list_to_raw_expr_list( + expr_list: Optional[list[Expr]], +) -> Optional[list[expr_internal.Expr]]: + return [e.expr for e in expr_list] if expr_list is not None else None + + def isnan(expr: Expr) -> Expr: """Returns true if a given number is +NaN or -NaN otherwise returns false.""" return Expr(f.isnan(expr.expr)) @@ -400,8 +407,8 @@ def window( df.select(functions.lag(col("a")).partition_by(col("b")).build()) """ args = [a.expr for a in args] - partition_by = [e.expr for e in partition_by] if partition_by is not None else None - order_by = [o.expr for o in order_by] if order_by is not None else None + partition_by = expr_list_to_raw_expr_list(partition_by) + order_by = expr_list_to_raw_expr_list(order_by) window_frame = window_frame.window_frame if window_frame is not None else None return Expr(f.window(name, args, partition_by, order_by, window_frame, ctx)) @@ -1486,9 +1493,14 @@ def flatten(array: Expr) -> Expr: # aggregate functions -def approx_distinct(expression: Expr) -> Expr: +def approx_distinct( + expression: Expr, + filter: Optional[Expr] = None, +) -> Expr: """Returns the approximate number of distinct values.""" - return Expr(f.approx_distinct(expression.expr)) + filter_raw = filter.expr if filter is not None else None + + return Expr(f.approx_distinct(expression.expr, filter=filter_raw)) def approx_median(arg: Expr, distinct: bool = False) -> Expr: @@ -1705,20 +1717,22 @@ def regr_syy(y: Expr, x: Expr, distinct: bool = False) -> Expr: def first_value( arg: Expr, distinct: bool = False, - filter: Optional[bool] = None, + filter: Optional[Expr] = None, order_by: Optional[list[Expr]] = None, - null_treatment: Optional[common.NullTreatment] = None, + null_treatment: Optional[NullTreatment] = None, ) -> Expr: """Returns the first value in a group of values.""" - order_by_cols = [e.expr for e in order_by] if order_by is not None else None + order_by_raw = expr_list_to_raw_expr_list(order_by) + filter_raw = filter.expr if filter is not None else None + null_treatment_raw = null_treatment.value if null_treatment is not None else None return Expr( f.first_value( arg.expr, distinct=distinct, - filter=filter, - order_by=order_by_cols, - null_treatment=null_treatment, + filter=filter_raw, + order_by=order_by_raw, + null_treatment=null_treatment_raw, ) ) @@ -1726,24 +1740,26 @@ def first_value( def last_value( arg: Expr, distinct: bool = False, - filter: Optional[bool] = None, + filter: Optional[Expr] = None, order_by: Optional[list[Expr]] = None, - null_treatment: Optional[common.NullTreatment] = None, + null_treatment: NullTreatment = NullTreatment.RESPECT_NULLS, ) -> Expr: """Returns the last value in a group of values. To set parameters on this expression, use ``.order_by()``, ``.distinct()``, ``.filter()``, or ``.null_treatment()``. """ - order_by_cols = [e.expr for e in order_by] if order_by is not None else None + order_by_raw = expr_list_to_raw_expr_list(order_by) + filter_raw = filter.expr if filter is not None else None + null_treatment_raw = null_treatment.value if null_treatment is not None else None return Expr( f.last_value( arg.expr, distinct=distinct, - filter=filter, - order_by=order_by_cols, - null_treatment=null_treatment, + filter=filter_raw, + order_by=order_by_raw, + null_treatment=null_treatment_raw, ) ) diff --git a/python/datafusion/tests/test_aggregation.py b/python/datafusion/tests/test_aggregation.py index ab653c40..06fbbab1 100644 --- a/python/datafusion/tests/test_aggregation.py +++ b/python/datafusion/tests/test_aggregation.py @@ -98,6 +98,13 @@ def test_aggregation_stats(df, agg_expr, calc_expected): "agg_expr, expected", [ (f.approx_distinct(column("b")), pa.array([2], type=pa.uint64())), + ( + f.approx_distinct( + column("b"), + filter=column("a") != lit(3), + ), + pa.array([1], type=pa.uint64()), + ), (f.approx_median(column("b")), pa.array([4])), (f.approx_percentile_cont(column("b"), lit(0.5)), pa.array([4])), ( diff --git a/python/datafusion/tests/test_wrapper_coverage.py b/python/datafusion/tests/test_wrapper_coverage.py index 44b9ca83..8c371638 100644 --- a/python/datafusion/tests/test_wrapper_coverage.py +++ b/python/datafusion/tests/test_wrapper_coverage.py @@ -19,9 +19,15 @@ import datafusion.functions import datafusion.object_store import datafusion.substrait +from enum import EnumType def missing_exports(internal_obj, wrapped_obj) -> None: + # Special case enums - just make sure they exist since dir() + # and other functions get overridden. + if isinstance(wrapped_obj, EnumType): + return + for attr in dir(internal_obj): assert attr in dir(wrapped_obj) diff --git a/src/functions.rs b/src/functions.rs index b5b003df..c973fa3b 100644 --- a/src/functions.rs +++ b/src/functions.rs @@ -40,8 +40,10 @@ use datafusion::logical_expr::{ }; #[pyfunction] -pub fn approx_distinct(expression: PyExpr) -> PyExpr { - functions_aggregate::expr_fn::approx_distinct(expression.expr).into() +pub fn approx_distinct(expression: PyExpr, filter: Option) -> PyResult { + let agg_fn = functions_aggregate::expr_fn::approx_distinct(expression.expr); + + add_builder_fns_to_aggregate(agg_fn, false, filter, None, None) } #[pyfunction] From 3b96b9d2a859ece13449bb41869f1db49191a5bc Mon Sep 17 00:00:00 2001 From: Tim Saucer Date: Wed, 4 Sep 2024 08:14:08 -0400 Subject: [PATCH 02/35] Small usability on aggregate --- python/datafusion/dataframe.py | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/python/datafusion/dataframe.py b/python/datafusion/dataframe.py index 46b8fa1b..56dff22a 100644 --- a/python/datafusion/dataframe.py +++ b/python/datafusion/dataframe.py @@ -180,7 +180,9 @@ def with_column_renamed(self, old_name: str, new_name: str) -> DataFrame: """ return DataFrame(self.df.with_column_renamed(old_name, new_name)) - def aggregate(self, group_by: list[Expr], aggs: list[Expr]) -> DataFrame: + def aggregate( + self, group_by: list[Expr] | Expr, aggs: list[Expr] | Expr + ) -> DataFrame: """Aggregates the rows of the current DataFrame. Args: @@ -190,6 +192,9 @@ def aggregate(self, group_by: list[Expr], aggs: list[Expr]) -> DataFrame: Returns: DataFrame after aggregation. """ + group_by = group_by if isinstance(group_by, list) else [group_by] + aggs = aggs if isinstance(aggs, list) else [aggs] + group_by = [e.expr for e in group_by] aggs = [e.expr for e in aggs] return DataFrame(self.df.aggregate(group_by, aggs)) From c434b4e51f22b3cb800cb09416185c37551de7fc Mon Sep 17 00:00:00 2001 From: Tim Saucer Date: Fri, 6 Sep 2024 06:33:12 -0400 Subject: [PATCH 03/35] Adding documentation and additional unit test for approx_median --- python/datafusion/functions.py | 32 ++++++++++++++++++--- python/datafusion/tests/test_aggregation.py | 7 ++++- src/functions.rs | 27 ++++++++++------- 3 files changed, 50 insertions(+), 16 deletions(-) diff --git a/python/datafusion/functions.py b/python/datafusion/functions.py index a2620edc..5b9f0ca9 100644 --- a/python/datafusion/functions.py +++ b/python/datafusion/functions.py @@ -1497,15 +1497,39 @@ def approx_distinct( expression: Expr, filter: Optional[Expr] = None, ) -> Expr: - """Returns the approximate number of distinct values.""" + """Returns the approximate number of distinct values. + + This aggregate function is similar to :py:func:`count` with distinct set, but it + will approximate the number of distinct entries. It may return significantly faster + than :py:func:`count` for some DataFrames. + + If using the builder functions described in ref:`_aggregation` this function ignores + the options ``order_by``, ``null_treatment``, and ``distinct``. + + Args: + expression: Values to check for distinct entries + filter: If provided, only compute against rows for which the filter is true + """ filter_raw = filter.expr if filter is not None else None return Expr(f.approx_distinct(expression.expr, filter=filter_raw)) -def approx_median(arg: Expr, distinct: bool = False) -> Expr: - """Returns the approximate median value.""" - return Expr(f.approx_median(arg.expr, distinct=distinct)) +def approx_median( + expression: Expr, distinct: bool = False, filter: Optional[Expr] = None +) -> Expr: + """Returns the approximate median value. + + This aggregate function is similar to :py:func:`median`, but it will only + approximate the median. It may return significantly faster for some DataFrames. + + Args: + expression: Values to find the median for + distinct: If True, only return the median of distinct values + filter: If provided, only compute against rows for which the filter is true + """ + filter_raw = filter.expr if filter is not None else None + return Expr(f.approx_median(expression.expr, distinct=distinct, filter=filter_raw)) def approx_percentile_cont( diff --git a/python/datafusion/tests/test_aggregation.py b/python/datafusion/tests/test_aggregation.py index 06fbbab1..89094057 100644 --- a/python/datafusion/tests/test_aggregation.py +++ b/python/datafusion/tests/test_aggregation.py @@ -34,8 +34,9 @@ def df(): pa.array([4, 4, 6]), pa.array([9, 8, 5]), pa.array([True, True, False]), + pa.array([1, None, None]), ], - names=["a", "b", "c", "d"], + names=["a", "b", "c", "d", "e"], ) return ctx.create_dataframe([[batch]]) @@ -87,6 +88,7 @@ def df_aggregate_100(): ], ) def test_aggregation_stats(df, agg_expr, calc_expected): + df = df.select("a", "b", "c", "d") agg_df = df.aggregate([], [agg_expr]) result = agg_df.collect()[0] values_a, values_b, values_c, values_d = df.collect()[0] @@ -106,6 +108,8 @@ def test_aggregation_stats(df, agg_expr, calc_expected): pa.array([1], type=pa.uint64()), ), (f.approx_median(column("b")), pa.array([4])), + (f.approx_median(column("b"), distinct=True), pa.array([5])), + (f.approx_median(column("b"), filter=column("a") != 2), pa.array([5])), (f.approx_percentile_cont(column("b"), lit(0.5)), pa.array([4])), ( f.approx_percentile_cont_with_weight(column("b"), lit(0.6), lit(0.5)), @@ -116,6 +120,7 @@ def test_aggregation_stats(df, agg_expr, calc_expected): ) def test_aggregation(df, agg_expr, expected): agg_df = df.aggregate([], [agg_expr]) + agg_df.show() result = agg_df.collect()[0] assert result.column(0) == expected diff --git a/src/functions.rs b/src/functions.rs index c973fa3b..60c71732 100644 --- a/src/functions.rs +++ b/src/functions.rs @@ -47,13 +47,14 @@ pub fn approx_distinct(expression: PyExpr, filter: Option) -> PyResult

PyResult { - let expr = functions_aggregate::expr_fn::approx_median(expression.expr); - if distinct { - Ok(expr.distinct().build()?.into()) - } else { - Ok(expr.into()) - } +pub fn approx_median( + expression: PyExpr, + distinct: bool, + filter: Option, +) -> PyResult { + let agg_fn = functions_aggregate::expr_fn::approx_median(expression.expr); + + add_builder_fns_to_aggregate(agg_fn, distinct, filter, None, None) } #[pyfunction] @@ -330,10 +331,14 @@ fn add_builder_fns_to_aggregate( ) -> PyResult { // Since ExprFuncBuilder::new() is private, we can guarantee initializing // a builder with an `order_by` default of empty vec - let order_by = order_by - .map(|x| x.into_iter().map(|x| x.expr).collect::>()) - .unwrap_or_default(); - let mut builder = agg_fn.order_by(order_by); + // let order_by = order_by + // .map(|x| x.into_iter().map(|x| x.expr).collect::>()) + // .unwrap_or_default(); + let mut builder = agg_fn.null_treatment(None); + + if let Some(ob) = order_by { + builder = builder.order_by(ob.into_iter().map(|e| e.expr).collect()); + } if distinct { builder = builder.distinct(); From 1a5e1383f6bd1d187d516ba2dbca8933dd178280 Mon Sep 17 00:00:00 2001 From: Tim Saucer Date: Fri, 6 Sep 2024 07:54:23 -0400 Subject: [PATCH 04/35] Update approx_percentil_cont with builder parameters it uses, which is filter but not distinct --- python/datafusion/functions.py | 42 ++++++++++++++------- python/datafusion/tests/test_aggregation.py | 16 +++++++- src/functions.rs | 19 ++++------ 3 files changed, 50 insertions(+), 27 deletions(-) diff --git a/python/datafusion/functions.py b/python/datafusion/functions.py index 5b9f0ca9..11ba4f39 100644 --- a/python/datafusion/functions.py +++ b/python/datafusion/functions.py @@ -1523,6 +1523,9 @@ def approx_median( This aggregate function is similar to :py:func:`median`, but it will only approximate the median. It may return significantly faster for some DataFrames. + If using the builder functions described in ref:`_aggregation` this function ignores + the options ``order_by`` and ``null_treatment``. + Args: expression: Values to find the median for distinct: If True, only return the median of distinct values @@ -1534,24 +1537,35 @@ def approx_median( def approx_percentile_cont( expression: Expr, - percentile: Expr, - num_centroids: Expr | None = None, - distinct: bool = False, + percentile: float, + num_centroids: Optional[int] = None, + filter: Optional[Expr] = None, ) -> Expr: - """Returns the value that is approximately at a given percentile of ``expr``.""" - if num_centroids is None: - return Expr( - f.approx_percentile_cont( - expression.expr, percentile.expr, distinct=distinct, num_centroids=None - ) - ) + """Returns the value that is approximately at a given percentile of ``expr``. + + This aggregate function assumes the input values form a continuous distribution. + Suppose you have a DataFrame which consists of 100 different test scores. If you + called this function with a percentile of 0.9, it would return the value of the + test score that is above 90% of the other test scores. The returned value may be + between two of the values. + This function uses the [t-digest](https://arxiv.org/abs/1902.04023) algorithm to + compute the percentil. You can limit the number of bins used in this algorithm by + setting the ``num_centroids`` parameter. + + If using the builder functions described in ref:`_aggregation` this function ignores + the options ``order_by``, ``null_treatment``, and ``distinct``. + + Args: + expression: Values for which to find the approximate percentile + percentile: This must be between 0.0 and 1.0, inclusive + num_centroids: Max bin size for the t-digest algorithm + filter: If provided, only compute against rows for which the filter is true + """ + filter_raw = filter.expr if filter is not None else None return Expr( f.approx_percentile_cont( - expression.expr, - percentile.expr, - distinct=distinct, - num_centroids=num_centroids.expr, + expression.expr, percentile, num_centroids=num_centroids, filter=filter_raw ) ) diff --git a/python/datafusion/tests/test_aggregation.py b/python/datafusion/tests/test_aggregation.py index 89094057..9715dfb1 100644 --- a/python/datafusion/tests/test_aggregation.py +++ b/python/datafusion/tests/test_aggregation.py @@ -110,7 +110,7 @@ def test_aggregation_stats(df, agg_expr, calc_expected): (f.approx_median(column("b")), pa.array([4])), (f.approx_median(column("b"), distinct=True), pa.array([5])), (f.approx_median(column("b"), filter=column("a") != 2), pa.array([5])), - (f.approx_percentile_cont(column("b"), lit(0.5)), pa.array([4])), + (f.approx_percentile_cont(column("b"), 0.5), pa.array([4])), ( f.approx_percentile_cont_with_weight(column("b"), lit(0.6), lit(0.5)), pa.array([6], type=pa.float64()), @@ -131,7 +131,17 @@ def test_aggregate_100(df_aggregate_100): result = ( df_aggregate_100.aggregate( [column("c1")], - [f.approx_percentile_cont(column("c3"), lit(0.95), lit(200)).alias("c3")], + [ + f.approx_percentile_cont(column("c3"), 0.95, num_centroids=200).alias( + "c3" + ), + f.approx_percentile_cont(column("c3"), 0.95, num_centroids=5).alias( + "c4" + ), + f.approx_percentile_cont( + column("c3"), 0.95, num_centroids=200, filter=column("c3") > lit(0) + ).alias("c5"), + ], ) .sort(column("c1").sort(ascending=True)) .collect() @@ -141,6 +151,8 @@ def test_aggregate_100(df_aggregate_100): result = result[0] assert result.column("c1") == pa.array(["a", "b", "c", "d", "e"]) assert result.column("c3") == pa.array([73, 68, 122, 124, 115]) + assert result.column("c4") == pa.array([72, 68, 119, 124, 115]) + assert result.column("c5") == pa.array([83, 68, 122, 124, 117]) def test_bit_add_or_xor(df): diff --git a/src/functions.rs b/src/functions.rs index 60c71732..7c572248 100644 --- a/src/functions.rs +++ b/src/functions.rs @@ -60,22 +60,19 @@ pub fn approx_median( #[pyfunction] pub fn approx_percentile_cont( expression: PyExpr, - percentile: PyExpr, - distinct: bool, - num_centroids: Option, // enforces optional arguments at the end, currently + percentile: f64, + num_centroids: Option, // enforces optional arguments at the end, currently + filter: Option, ) -> PyResult { let args = if let Some(num_centroids) = num_centroids { - vec![expression.expr, percentile.expr, num_centroids.expr] + vec![expression.expr, lit(percentile), lit(num_centroids)] } else { - vec![expression.expr, percentile.expr] + vec![expression.expr, lit(percentile)] }; let udaf = functions_aggregate::approx_percentile_cont::approx_percentile_cont_udaf(); - let expr = udaf.call(args); - if distinct { - Ok(expr.distinct().build()?.into()) - } else { - Ok(expr.into()) - } + let agg_fn = udaf.call(args); + + add_builder_fns_to_aggregate(agg_fn, false, filter, None, None) } #[pyfunction] From c931c065fb4cc94b58b5ed7b386a7caf947ac8d7 Mon Sep 17 00:00:00 2001 From: Tim Saucer Date: Fri, 6 Sep 2024 08:06:09 -0400 Subject: [PATCH 05/35] Update approx_percentil_cont_with_weight with builder parameters it uses, which is filter but not distinct --- python/datafusion/functions.py | 21 ++++++++++++++++----- python/datafusion/tests/test_aggregation.py | 9 +++++++-- src/functions.rs | 15 ++++++--------- 3 files changed, 29 insertions(+), 16 deletions(-) diff --git a/python/datafusion/functions.py b/python/datafusion/functions.py index 11ba4f39..e8ed7f02 100644 --- a/python/datafusion/functions.py +++ b/python/datafusion/functions.py @@ -1571,16 +1571,27 @@ def approx_percentile_cont( def approx_percentile_cont_with_weight( - arg: Expr, weight: Expr, percentile: Expr, distinct: bool = False + expression: Expr, weight: Expr, percentile: float, filter: Optional[Expr] = None ) -> Expr: - """Returns the value of the approximate percentile. + """Returns the value of the weighted approximate percentile. + + This aggregate function is similar to :py:func:`approx_percentile_cont` except that + it uses the associated associated weights. + + If using the builder functions described in ref:`_aggregation` this function ignores + the options ``order_by``, ``null_treatment``, and ``distinct``. + + Args: + expression: Values for which to find the approximate percentile + weight: Relative weight for each of the values in ``expression`` + percentile: This must be between 0.0 and 1.0, inclusive + filter: If provided, only compute against rows for which the filter is true - This function is similar to :py:func:`approx_percentile_cont` except that it uses - the associated associated weights. """ + filter_raw = filter.expr if filter is not None else None return Expr( f.approx_percentile_cont_with_weight( - arg.expr, weight.expr, percentile.expr, distinct=distinct + expression.expr, weight.expr, percentile, filter=filter_raw ) ) diff --git a/python/datafusion/tests/test_aggregation.py b/python/datafusion/tests/test_aggregation.py index 9715dfb1..d8a39e31 100644 --- a/python/datafusion/tests/test_aggregation.py +++ b/python/datafusion/tests/test_aggregation.py @@ -112,15 +112,20 @@ def test_aggregation_stats(df, agg_expr, calc_expected): (f.approx_median(column("b"), filter=column("a") != 2), pa.array([5])), (f.approx_percentile_cont(column("b"), 0.5), pa.array([4])), ( - f.approx_percentile_cont_with_weight(column("b"), lit(0.6), lit(0.5)), + f.approx_percentile_cont_with_weight(column("b"), lit(0.6), 0.5), pa.array([6], type=pa.float64()), ), + ( + f.approx_percentile_cont_with_weight( + column("b"), lit(0.6), 0.5, filter=column("a") != lit(3) + ), + pa.array([4], type=pa.float64()), + ), (f.array_agg(column("b")), pa.array([[4, 4, 6]])), ], ) def test_aggregation(df, agg_expr, expected): agg_df = df.aggregate([], [agg_expr]) - agg_df.show() result = agg_df.collect()[0] assert result.column(0) == expected diff --git a/src/functions.rs b/src/functions.rs index 7c572248..b76f5f76 100644 --- a/src/functions.rs +++ b/src/functions.rs @@ -79,19 +79,16 @@ pub fn approx_percentile_cont( pub fn approx_percentile_cont_with_weight( expression: PyExpr, weight: PyExpr, - percentile: PyExpr, - distinct: bool, + percentile: f64, + filter: Option, ) -> PyResult { - let expr = functions_aggregate::expr_fn::approx_percentile_cont_with_weight( + let agg_fn = functions_aggregate::expr_fn::approx_percentile_cont_with_weight( expression.expr, weight.expr, - percentile.expr, + lit(percentile), ); - if distinct { - Ok(expr.distinct().build()?.into()) - } else { - Ok(expr.into()) - } + + add_builder_fns_to_aggregate(agg_fn, false, filter, None, None) } #[pyfunction] From 2cc7c94685718e3b4946033cc4b57ddc802bb368 Mon Sep 17 00:00:00 2001 From: Tim Saucer Date: Sat, 7 Sep 2024 07:27:03 -0400 Subject: [PATCH 06/35] Update array_agg to use aggregate options --- python/datafusion/functions.py | 41 +++++++++--- python/datafusion/tests/test_aggregation.py | 36 +++++++--- src/functions.rs | 74 +++++++++------------ 3 files changed, 91 insertions(+), 60 deletions(-) diff --git a/python/datafusion/functions.py b/python/datafusion/functions.py index e8ed7f02..d0a437ef 100644 --- a/python/datafusion/functions.py +++ b/python/datafusion/functions.py @@ -1515,24 +1515,21 @@ def approx_distinct( return Expr(f.approx_distinct(expression.expr, filter=filter_raw)) -def approx_median( - expression: Expr, distinct: bool = False, filter: Optional[Expr] = None -) -> Expr: +def approx_median(expression: Expr, filter: Optional[Expr] = None) -> Expr: """Returns the approximate median value. This aggregate function is similar to :py:func:`median`, but it will only approximate the median. It may return significantly faster for some DataFrames. If using the builder functions described in ref:`_aggregation` this function ignores - the options ``order_by`` and ``null_treatment``. + the options ``order_by`` and ``null_treatment``, and ``distinct``. Args: expression: Values to find the median for - distinct: If True, only return the median of distinct values filter: If provided, only compute against rows for which the filter is true """ filter_raw = filter.expr if filter is not None else None - return Expr(f.approx_median(expression.expr, distinct=distinct, filter=filter_raw)) + return Expr(f.approx_median(expression.expr, filter=filter_raw)) def approx_percentile_cont( @@ -1596,9 +1593,35 @@ def approx_percentile_cont_with_weight( ) -def array_agg(arg: Expr, distinct: bool = False) -> Expr: - """Aggregate values into an array.""" - return Expr(f.array_agg(arg.expr, distinct=distinct)) +def array_agg( + expression: Expr, + distinct: bool = False, + filter: Optional[Expr] = None, + order_by: Optional[list[Expr]] = None, +) -> Expr: + """Aggregate values into an array. + + Currently ``distinct`` and ``order_by`` cannot be used together. As a work around, + consider :py:func:`array_sort` after aggregation. + [Issue Tracker](https://github.com/apache/datafusion/issues/12371) + + If using the builder functions described in ref:`_aggregation` this function ignores + the option ``null_treatment``. + + Args: + expression: Values to combine into an array + distinct: If True, a single entry for each distinct value will be in the result + filter: If provided, only compute against rows for which the filter is true + order_by: Order the resultant array values + """ + order_by_raw = expr_list_to_raw_expr_list(order_by) + filter_raw = filter.expr if filter is not None else None + + return Expr( + f.array_agg( + expression.expr, distinct=distinct, filter=filter_raw, order_by=order_by_raw + ) + ) def avg(arg: Expr, distinct: bool = False) -> Expr: diff --git a/python/datafusion/tests/test_aggregation.py b/python/datafusion/tests/test_aggregation.py index d8a39e31..f79838c0 100644 --- a/python/datafusion/tests/test_aggregation.py +++ b/python/datafusion/tests/test_aggregation.py @@ -97,36 +97,54 @@ def test_aggregation_stats(df, agg_expr, calc_expected): @pytest.mark.parametrize( - "agg_expr, expected", + "agg_expr, expected, array_sort", [ - (f.approx_distinct(column("b")), pa.array([2], type=pa.uint64())), + (f.approx_distinct(column("b")), pa.array([2], type=pa.uint64()), False), ( f.approx_distinct( column("b"), filter=column("a") != lit(3), ), pa.array([1], type=pa.uint64()), + False, ), - (f.approx_median(column("b")), pa.array([4])), - (f.approx_median(column("b"), distinct=True), pa.array([5])), - (f.approx_median(column("b"), filter=column("a") != 2), pa.array([5])), - (f.approx_percentile_cont(column("b"), 0.5), pa.array([4])), + (f.approx_median(column("b")), pa.array([4]), False), + (f.approx_median(column("b"), filter=column("a") != 2), pa.array([5]), False), + (f.approx_percentile_cont(column("b"), 0.5), pa.array([4]), False), ( f.approx_percentile_cont_with_weight(column("b"), lit(0.6), 0.5), pa.array([6], type=pa.float64()), + False, ), ( f.approx_percentile_cont_with_weight( column("b"), lit(0.6), 0.5, filter=column("a") != lit(3) ), pa.array([4], type=pa.float64()), + False, + ), + (f.array_agg(column("b")), pa.array([[4, 4, 6]]), False), + (f.array_agg(column("b"), distinct=True), pa.array([[4, 6]]), True), + ( + f.array_agg(column("e"), filter=column("e").is_not_null()), + pa.array([[1]]), + False, + ), + ( + f.array_agg(column("b"), order_by=[column("c")]), + pa.array([[6, 4, 4]]), + False, ), - (f.array_agg(column("b")), pa.array([[4, 4, 6]])), ], ) -def test_aggregation(df, agg_expr, expected): - agg_df = df.aggregate([], [agg_expr]) +def test_aggregation(df, agg_expr, expected, array_sort): + agg_df = df.aggregate([], [agg_expr.alias("agg_expr")]) + if array_sort: + agg_df = agg_df.select(f.array_sort(column("agg_expr"))) + agg_df.show() result = agg_df.collect()[0] + + print(result) assert result.column(0) == expected diff --git a/src/functions.rs b/src/functions.rs index b76f5f76..9da1fe9f 100644 --- a/src/functions.rs +++ b/src/functions.rs @@ -43,18 +43,14 @@ use datafusion::logical_expr::{ pub fn approx_distinct(expression: PyExpr, filter: Option) -> PyResult { let agg_fn = functions_aggregate::expr_fn::approx_distinct(expression.expr); - add_builder_fns_to_aggregate(agg_fn, false, filter, None, None) + add_builder_fns_to_aggregate(agg_fn, None, filter, None, None) } #[pyfunction] -pub fn approx_median( - expression: PyExpr, - distinct: bool, - filter: Option, -) -> PyResult { +pub fn approx_median(expression: PyExpr, filter: Option) -> PyResult { let agg_fn = functions_aggregate::expr_fn::approx_median(expression.expr); - add_builder_fns_to_aggregate(agg_fn, distinct, filter, None, None) + add_builder_fns_to_aggregate(agg_fn, None, filter, None, None) } #[pyfunction] @@ -72,7 +68,7 @@ pub fn approx_percentile_cont( let udaf = functions_aggregate::approx_percentile_cont::approx_percentile_cont_udaf(); let agg_fn = udaf.call(args); - add_builder_fns_to_aggregate(agg_fn, false, filter, None, None) + add_builder_fns_to_aggregate(agg_fn, None, filter, None, None) } #[pyfunction] @@ -88,17 +84,7 @@ pub fn approx_percentile_cont_with_weight( lit(percentile), ); - add_builder_fns_to_aggregate(agg_fn, false, filter, None, None) -} - -#[pyfunction] -pub fn avg(expression: PyExpr, distinct: bool) -> PyResult { - let expr = functions_aggregate::expr_fn::avg(expression.expr); - if distinct { - Ok(expr.distinct().build()?.into()) - } else { - Ok(expr.into()) - } + add_builder_fns_to_aggregate(agg_fn, None, filter, None, None) } #[pyfunction] @@ -318,7 +304,7 @@ pub fn regr_syy(expr_y: PyExpr, expr_x: PyExpr, distinct: bool) -> PyResult, filter: Option, order_by: Option>, null_treatment: Option, @@ -330,11 +316,12 @@ fn add_builder_fns_to_aggregate( // .unwrap_or_default(); let mut builder = agg_fn.null_treatment(None); - if let Some(ob) = order_by { - builder = builder.order_by(ob.into_iter().map(|e| e.expr).collect()); + if let Some(order_by_cols) = order_by { + let order_by_cols = to_sort_expressions(order_by_cols); + builder = builder.order_by(order_by_cols); } - if distinct { + if let Some(true) = distinct { builder = builder.distinct(); } @@ -351,7 +338,7 @@ fn add_builder_fns_to_aggregate( #[pyfunction] pub fn first_value( expr: PyExpr, - distinct: bool, + distinct: Option, filter: Option, order_by: Option>, null_treatment: Option, @@ -365,7 +352,7 @@ pub fn first_value( #[pyfunction] pub fn last_value( expr: PyExpr, - distinct: bool, + distinct: Option, filter: Option, order_by: Option>, null_treatment: Option, @@ -647,24 +634,26 @@ fn window( }) } +// Generates a [pyo3] wrapper for associated aggregate functions. +// All of the builder options are exposed to the python internal +// function and we rely on the wrappers to only use those that +// are appropriate. macro_rules! aggregate_function { ($NAME: ident, $FUNC: path) => { - aggregate_function!($NAME, $FUNC, stringify!($NAME)); + aggregate_function!($NAME, $FUNC, expr); }; - ($NAME: ident, $FUNC: path, $DOC: expr) => { - #[doc = $DOC] + ($NAME: ident, $FUNC: path, $($arg:ident)*) => { #[pyfunction] - #[pyo3(signature = (*args, distinct=false))] - fn $NAME(args: Vec, distinct: bool) -> PyExpr { - let expr = datafusion::logical_expr::Expr::AggregateFunction(AggregateFunction { - func: $FUNC(), - args: args.into_iter().map(|e| e.into()).collect(), - distinct, - filter: None, - order_by: None, - null_treatment: None, - }); - expr.into() + fn $NAME( + $($arg: PyExpr),*, + distinct: Option, + filter: Option, + order_by: Option>, + null_treatment: Option + ) -> PyResult { + let agg_fn = $FUNC($($arg.into()),*); + + add_builder_fns_to_aggregate(agg_fn, distinct, filter, order_by, null_treatment) } }; } @@ -892,9 +881,10 @@ array_fn!(array_resize, array size value); array_fn!(flatten, array); array_fn!(range, start stop step); -aggregate_function!(array_agg, functions_aggregate::array_agg::array_agg_udaf); -aggregate_function!(max, functions_aggregate::min_max::max_udaf); -aggregate_function!(min, functions_aggregate::min_max::min_udaf); +aggregate_function!(array_agg, functions_aggregate::array_agg::array_agg); +aggregate_function!(max, functions_aggregate::min_max::max); +aggregate_function!(min, functions_aggregate::min_max::min); +aggregate_function!(avg, functions_aggregate::expr_fn::avg); fn add_builder_fns_to_window( window_fn: Expr, From 52e33acb5fec79278a3207d642e5b4a067e1e080 Mon Sep 17 00:00:00 2001 From: Tim Saucer Date: Sat, 7 Sep 2024 07:40:24 -0400 Subject: [PATCH 07/35] Update builder options for avg aggregate function --- python/datafusion/functions.py | 24 ++++++++++++++++----- python/datafusion/tests/test_aggregation.py | 5 +++-- 2 files changed, 22 insertions(+), 7 deletions(-) diff --git a/python/datafusion/functions.py b/python/datafusion/functions.py index d0a437ef..9ca36988 100644 --- a/python/datafusion/functions.py +++ b/python/datafusion/functions.py @@ -1624,9 +1624,23 @@ def array_agg( ) -def avg(arg: Expr, distinct: bool = False) -> Expr: - """Returns the average value.""" - return Expr(f.avg(arg.expr, distinct=distinct)) +def avg( + expression: Expr, + filter: Optional[Expr] = None, +) -> Expr: + """Returns the average value. + + This aggregate function expects a numeric expression and will return a float. + + If using the builder functions described in ref:`_aggregation` this function ignores + the options ``order_by``, ``null_treatment``, and ``distinct``. + + Args: + expression: Values to combine into an array + filter: If provided, only compute against rows for which the filter is true + """ + filter_raw = filter.expr if filter is not None else None + return Expr(f.avg(expression.expr, filter=filter_raw)) def corr(value1: Expr, value2: Expr, distinct: bool = False) -> Expr: @@ -1676,12 +1690,12 @@ def max(arg: Expr, distinct: bool = False) -> Expr: return Expr(f.max(arg.expr, distinct=distinct)) -def mean(arg: Expr, distinct: bool = False) -> Expr: +def mean(expression: Expr, filter: Optional[Expr] = None) -> Expr: """Returns the average (mean) value of the argument. This is an alias for :py:func:`avg`. """ - return avg(arg, distinct) + return avg(expression, filter) def median(arg: Expr) -> Expr: diff --git a/python/datafusion/tests/test_aggregation.py b/python/datafusion/tests/test_aggregation.py index f79838c0..0e2fedd9 100644 --- a/python/datafusion/tests/test_aggregation.py +++ b/python/datafusion/tests/test_aggregation.py @@ -34,7 +34,7 @@ def df(): pa.array([4, 4, 6]), pa.array([9, 8, 5]), pa.array([True, True, False]), - pa.array([1, None, None]), + pa.array([1, 2, None]), ], names=["a", "b", "c", "d", "e"], ) @@ -127,7 +127,7 @@ def test_aggregation_stats(df, agg_expr, calc_expected): (f.array_agg(column("b"), distinct=True), pa.array([[4, 6]]), True), ( f.array_agg(column("e"), filter=column("e").is_not_null()), - pa.array([[1]]), + pa.array([[1, 2]]), False, ), ( @@ -135,6 +135,7 @@ def test_aggregation_stats(df, agg_expr, calc_expected): pa.array([[6, 4, 4]]), False, ), + (f.avg(column("b"), filter=column("a") != lit(1)), pa.array([5.0]), False), ], ) def test_aggregation(df, agg_expr, expected, array_sort): From 37016318bc12b04e14884c7eefe4992c20f3faec Mon Sep 17 00:00:00 2001 From: Tim Saucer Date: Sat, 7 Sep 2024 07:42:24 -0400 Subject: [PATCH 08/35] move bit_and bit_or to use macro to generaty python fn --- src/functions.rs | 22 ++-------------------- 1 file changed, 2 insertions(+), 20 deletions(-) diff --git a/src/functions.rs b/src/functions.rs index 9da1fe9f..d45edcb5 100644 --- a/src/functions.rs +++ b/src/functions.rs @@ -87,26 +87,6 @@ pub fn approx_percentile_cont_with_weight( add_builder_fns_to_aggregate(agg_fn, None, filter, None, None) } -#[pyfunction] -pub fn bit_and(expr_x: PyExpr, distinct: bool) -> PyResult { - let expr = functions_aggregate::expr_fn::bit_and(expr_x.expr); - if distinct { - Ok(expr.distinct().build()?.into()) - } else { - Ok(expr.into()) - } -} - -#[pyfunction] -pub fn bit_or(expression: PyExpr, distinct: bool) -> PyResult { - let expr = functions_aggregate::expr_fn::bit_or(expression.expr); - if distinct { - Ok(expr.distinct().build()?.into()) - } else { - Ok(expr.into()) - } -} - #[pyfunction] pub fn bit_xor(expression: PyExpr, distinct: bool) -> PyResult { let expr = functions_aggregate::expr_fn::bit_xor(expression.expr); @@ -885,6 +865,8 @@ aggregate_function!(array_agg, functions_aggregate::array_agg::array_agg); aggregate_function!(max, functions_aggregate::min_max::max); aggregate_function!(min, functions_aggregate::min_max::min); aggregate_function!(avg, functions_aggregate::expr_fn::avg); +aggregate_function!(bit_and, functions_aggregate::expr_fn::bit_and); +aggregate_function!(bit_or, functions_aggregate::expr_fn::bit_or); fn add_builder_fns_to_window( window_fn: Expr, From 176092c0b73ab443dd49ec28fb16d051ea395bc6 Mon Sep 17 00:00:00 2001 From: Tim Saucer Date: Sat, 7 Sep 2024 08:13:17 -0400 Subject: [PATCH 09/35] Update builder arguments for bitwise operators --- python/datafusion/functions.py | 54 +++++++++++++++++---- python/datafusion/tests/test_aggregation.py | 40 +++++++++------ src/functions.rs | 11 +---- 3 files changed, 71 insertions(+), 34 deletions(-) diff --git a/python/datafusion/functions.py b/python/datafusion/functions.py index 9ca36988..26a7f4dd 100644 --- a/python/datafusion/functions.py +++ b/python/datafusion/functions.py @@ -1850,19 +1850,55 @@ def last_value( ) -def bit_and(arg: Expr, distinct: bool = False) -> Expr: - """Computes the bitwise AND of the argument.""" - return Expr(f.bit_and(arg.expr, distinct=distinct)) +def bit_and(expression: Expr, filter: Optional[Expr] = None) -> Expr: + """Computes the bitwise AND of the argument. + This aggregate function will bitwise compare every value in the input partition. -def bit_or(arg: Expr, distinct: bool = False) -> Expr: - """Computes the bitwise OR of the argument.""" - return Expr(f.bit_or(arg.expr, distinct=distinct)) + If using the builder functions described in ref:`_aggregation` this function ignores + the options ``order_by``, ``null_treatment``, and ``distinct``. + + Args: + expression: Argument to perform bitwise calculation on + filter: If provided, only compute against rows for which the filter is true + """ + filter_raw = filter.expr if filter is not None else None + return Expr(f.bit_and(expression.expr, filter=filter_raw)) -def bit_xor(arg: Expr, distinct: bool = False) -> Expr: - """Computes the bitwise XOR of the argument.""" - return Expr(f.bit_xor(arg.expr, distinct=distinct)) +def bit_or(expression: Expr, filter: Optional[Expr] = None) -> Expr: + """Computes the bitwise OR of the argument. + + This aggregate function will bitwise compare every value in the input partition. + + If using the builder functions described in ref:`_aggregation` this function ignores + the options ``order_by``, ``null_treatment``, and ``distinct``. + + Args: + expression: Argument to perform bitwise calculation on + filter: If provided, only compute against rows for which the filter is true + """ + filter_raw = filter.expr if filter is not None else None + return Expr(f.bit_or(expression.expr, filter=filter_raw)) + + +def bit_xor( + expression: Expr, distinct: bool = False, filter: Optional[Expr] = None +) -> Expr: + """Computes the bitwise XOR of the argument. + + This aggregate function will bitwise compare every value in the input partition. + + If using the builder functions described in ref:`_aggregation` this function ignores + the options ``order_by`` and ``null_treatment``. + + Args: + expression: Argument to perform bitwise calculation on + distinct: If True, evaluate each unique value of expression only once + filter: If provided, only compute against rows for which the filter is true + """ + filter_raw = filter.expr if filter is not None else None + return Expr(f.bit_xor(expression.expr, distinct=distinct, filter=filter_raw)) def bool_and(arg: Expr, distinct: bool = False) -> Expr: diff --git a/python/datafusion/tests/test_aggregation.py b/python/datafusion/tests/test_aggregation.py index 0e2fedd9..88406909 100644 --- a/python/datafusion/tests/test_aggregation.py +++ b/python/datafusion/tests/test_aggregation.py @@ -179,21 +179,31 @@ def test_aggregate_100(df_aggregate_100): assert result.column("c5") == pa.array([83, 68, 122, 124, 117]) -def test_bit_add_or_xor(df): - df = df.aggregate( - [], - [ - f.bit_and(column("a")), - f.bit_or(column("b")), - f.bit_xor(column("c")), - ], - ) - - result = df.collect() - result = result[0] - assert result.column(0) == pa.array([0]) - assert result.column(1) == pa.array([6]) - assert result.column(2) == pa.array([4]) +data_test_bitwise_functions = [ + ("bit_and", f.bit_and(column("a")), [0]), + ("bit_and_filter", f.bit_and(column("a"), filter=column("a") != lit(2)), [1]), + ("bit_or", f.bit_or(column("b")), [6]), + ("bit_or_filter", f.bit_or(column("b"), filter=column("a") != lit(3)), [4]), + ("bit_xor", f.bit_xor(column("c")), [4]), + ("bit_xor_distinct", f.bit_xor(column("b"), distinct=True), [2]), + ("bit_xor_filter", f.bit_xor(column("b"), filter=column("a") != lit(3)), [0]), + ( + "bit_xor_filter_distinct", + f.bit_xor(column("b"), distinct=True, filter=column("a") != lit(3)), + [4], + ), +] + + +@pytest.mark.parametrize("name,expr,result", data_test_bitwise_functions) +def test_bit_add_or_xor(df, name, expr, result): + df = df.aggregate([], [expr.alias(name)]) + + expected = { + name: result, + } + + assert df.collect()[0].to_pydict() == expected def test_bool_and_or(df): diff --git a/src/functions.rs b/src/functions.rs index d45edcb5..e890b888 100644 --- a/src/functions.rs +++ b/src/functions.rs @@ -87,16 +87,6 @@ pub fn approx_percentile_cont_with_weight( add_builder_fns_to_aggregate(agg_fn, None, filter, None, None) } -#[pyfunction] -pub fn bit_xor(expression: PyExpr, distinct: bool) -> PyResult { - let expr = functions_aggregate::expr_fn::bit_xor(expression.expr); - if distinct { - Ok(expr.distinct().build()?.into()) - } else { - Ok(expr.into()) - } -} - #[pyfunction] pub fn bool_and(expression: PyExpr, distinct: bool) -> PyResult { let expr = functions_aggregate::expr_fn::bool_and(expression.expr); @@ -867,6 +857,7 @@ aggregate_function!(min, functions_aggregate::min_max::min); aggregate_function!(avg, functions_aggregate::expr_fn::avg); aggregate_function!(bit_and, functions_aggregate::expr_fn::bit_and); aggregate_function!(bit_or, functions_aggregate::expr_fn::bit_or); +aggregate_function!(bit_xor, functions_aggregate::expr_fn::bit_xor); fn add_builder_fns_to_window( window_fn: Expr, From fdee791cac3bdd9d932584832ec42c2c74fe744e Mon Sep 17 00:00:00 2001 From: Tim Saucer Date: Sat, 7 Sep 2024 08:18:36 -0400 Subject: [PATCH 10/35] Use macro for bool_and and bool_or --- src/functions.rs | 39 ++++++++++++--------------------------- 1 file changed, 12 insertions(+), 27 deletions(-) diff --git a/src/functions.rs b/src/functions.rs index e890b888..602cac32 100644 --- a/src/functions.rs +++ b/src/functions.rs @@ -87,26 +87,6 @@ pub fn approx_percentile_cont_with_weight( add_builder_fns_to_aggregate(agg_fn, None, filter, None, None) } -#[pyfunction] -pub fn bool_and(expression: PyExpr, distinct: bool) -> PyResult { - let expr = functions_aggregate::expr_fn::bool_and(expression.expr); - if distinct { - Ok(expr.distinct().build()?.into()) - } else { - Ok(expr.into()) - } -} - -#[pyfunction] -pub fn bool_or(expression: PyExpr, distinct: bool) -> PyResult { - let expr = functions_aggregate::expr_fn::bool_or(expression.expr); - if distinct { - Ok(expr.distinct().build()?.into()) - } else { - Ok(expr.into()) - } -} - #[pyfunction] pub fn corr(y: PyExpr, x: PyExpr, distinct: bool) -> PyResult { let expr = functions_aggregate::expr_fn::corr(y.expr, x.expr); @@ -609,6 +589,9 @@ fn window( // function and we rely on the wrappers to only use those that // are appropriate. macro_rules! aggregate_function { + ($NAME: ident) => { + aggregate_function!($NAME, functions_aggregate::expr_fn::$NAME, expr); + }; ($NAME: ident, $FUNC: path) => { aggregate_function!($NAME, $FUNC, expr); }; @@ -851,13 +834,15 @@ array_fn!(array_resize, array size value); array_fn!(flatten, array); array_fn!(range, start stop step); -aggregate_function!(array_agg, functions_aggregate::array_agg::array_agg); -aggregate_function!(max, functions_aggregate::min_max::max); -aggregate_function!(min, functions_aggregate::min_max::min); -aggregate_function!(avg, functions_aggregate::expr_fn::avg); -aggregate_function!(bit_and, functions_aggregate::expr_fn::bit_and); -aggregate_function!(bit_or, functions_aggregate::expr_fn::bit_or); -aggregate_function!(bit_xor, functions_aggregate::expr_fn::bit_xor); +aggregate_function!(array_agg); +aggregate_function!(max); +aggregate_function!(min); +aggregate_function!(avg); +aggregate_function!(bit_and); +aggregate_function!(bit_or); +aggregate_function!(bit_xor); +aggregate_function!(bool_and); +aggregate_function!(bool_or); fn add_builder_fns_to_window( window_fn: Expr, From 4f9373627d7513fbaaf312326e7af61da6115ae3 Mon Sep 17 00:00:00 2001 From: Tim Saucer Date: Sat, 7 Sep 2024 08:25:21 -0400 Subject: [PATCH 11/35] Update python wrapper for arguments appropriate to bool operators --- python/datafusion/functions.py | 36 +++++++++++++++++---- python/datafusion/tests/test_aggregation.py | 24 ++++---------- 2 files changed, 37 insertions(+), 23 deletions(-) diff --git a/python/datafusion/functions.py b/python/datafusion/functions.py index 26a7f4dd..559266c8 100644 --- a/python/datafusion/functions.py +++ b/python/datafusion/functions.py @@ -1901,14 +1901,38 @@ def bit_xor( return Expr(f.bit_xor(expression.expr, distinct=distinct, filter=filter_raw)) -def bool_and(arg: Expr, distinct: bool = False) -> Expr: - """Computes the boolean AND of the argument.""" - return Expr(f.bool_and(arg.expr, distinct=distinct)) +def bool_and(expression: Expr, filter: Optional[Expr] = None) -> Expr: + """Computes the boolean AND of the argument. + This aggregate function will compare every value in the input partition. These are + expected to be boolean values. -def bool_or(arg: Expr, distinct: bool = False) -> Expr: - """Computes the boolean OR of the argument.""" - return Expr(f.bool_or(arg.expr, distinct=distinct)) + If using the builder functions described in ref:`_aggregation` this function ignores + the options ``order_by``, ``null_treatment``, and ``distinct``. + + Args: + expression: Argument to perform calculation on + filter: If provided, only compute against rows for which the filter is true + """ + filter_raw = filter.expr if filter is not None else None + return Expr(f.bool_and(expression.expr, filter=filter_raw)) + + +def bool_or(expression: Expr, filter: Optional[Expr] = None) -> Expr: + """Computes the boolean OR of the argument. + + This aggregate function will compare every value in the input partition. These are + expected to be boolean values. + + If using the builder functions described in ref:`_aggregation` this function ignores + the options ``order_by``, ``null_treatment``, and ``distinct``. + + Args: + expression: Argument to perform calculation on + filter: If provided, only compute against rows for which the filter is true + """ + filter_raw = filter.expr if filter is not None else None + return Expr(f.bool_or(expression.expr, filter=filter_raw)) def lead( diff --git a/python/datafusion/tests/test_aggregation.py b/python/datafusion/tests/test_aggregation.py index 88406909..bc56c7a2 100644 --- a/python/datafusion/tests/test_aggregation.py +++ b/python/datafusion/tests/test_aggregation.py @@ -179,7 +179,7 @@ def test_aggregate_100(df_aggregate_100): assert result.column("c5") == pa.array([83, 68, 122, 124, 117]) -data_test_bitwise_functions = [ +data_test_bitwise_and_boolean_functions = [ ("bit_and", f.bit_and(column("a")), [0]), ("bit_and_filter", f.bit_and(column("a"), filter=column("a") != lit(2)), [1]), ("bit_or", f.bit_or(column("b")), [6]), @@ -192,11 +192,15 @@ def test_aggregate_100(df_aggregate_100): f.bit_xor(column("b"), distinct=True, filter=column("a") != lit(3)), [4], ), + ("bool_and", f.bool_and(column("d")), [False]), + ("bool_and_filter", f.bool_and(column("d"), filter=column("a") != lit(3)), [True]), + ("bool_or", f.bool_or(column("d")), [True]), + ("bool_or_filter", f.bool_or(column("d"), filter=column("a") == lit(3)), [False]), ] -@pytest.mark.parametrize("name,expr,result", data_test_bitwise_functions) -def test_bit_add_or_xor(df, name, expr, result): +@pytest.mark.parametrize("name,expr,result", data_test_bitwise_and_boolean_functions) +def test_bit_and_bool_fns(df, name, expr, result): df = df.aggregate([], [expr.alias(name)]) expected = { @@ -204,17 +208,3 @@ def test_bit_add_or_xor(df, name, expr, result): } assert df.collect()[0].to_pydict() == expected - - -def test_bool_and_or(df): - df = df.aggregate( - [], - [ - f.bool_and(column("d")), - f.bool_or(column("d")), - ], - ) - result = df.collect() - result = result[0] - assert result.column(0) == pa.array([False]) - assert result.column(1) == pa.array([True]) From 62f3d2c26d5f5ec783a7c055a6694220dd85cc67 Mon Sep 17 00:00:00 2001 From: Tim Saucer Date: Sat, 7 Sep 2024 08:35:07 -0400 Subject: [PATCH 12/35] Set corr to use macro for pyfunction --- src/functions.rs | 20 ++++---------------- 1 file changed, 4 insertions(+), 16 deletions(-) diff --git a/src/functions.rs b/src/functions.rs index 602cac32..7d2ff6ef 100644 --- a/src/functions.rs +++ b/src/functions.rs @@ -87,16 +87,6 @@ pub fn approx_percentile_cont_with_weight( add_builder_fns_to_aggregate(agg_fn, None, filter, None, None) } -#[pyfunction] -pub fn corr(y: PyExpr, x: PyExpr, distinct: bool) -> PyResult { - let expr = functions_aggregate::expr_fn::corr(y.expr, x.expr); - if distinct { - Ok(expr.distinct().build()?.into()) - } else { - Ok(expr.into()) - } -} - #[pyfunction] pub fn grouping(expression: PyExpr, distinct: bool) -> PyResult { let expr = functions_aggregate::expr_fn::grouping(expression.expr); @@ -590,12 +580,9 @@ fn window( // are appropriate. macro_rules! aggregate_function { ($NAME: ident) => { - aggregate_function!($NAME, functions_aggregate::expr_fn::$NAME, expr); - }; - ($NAME: ident, $FUNC: path) => { - aggregate_function!($NAME, $FUNC, expr); + aggregate_function!($NAME, expr); }; - ($NAME: ident, $FUNC: path, $($arg:ident)*) => { + ($NAME: ident, $($arg:ident)*) => { #[pyfunction] fn $NAME( $($arg: PyExpr),*, @@ -604,7 +591,7 @@ macro_rules! aggregate_function { order_by: Option>, null_treatment: Option ) -> PyResult { - let agg_fn = $FUNC($($arg.into()),*); + let agg_fn = functions_aggregate::expr_fn::$NAME($($arg.into()),*); add_builder_fns_to_aggregate(agg_fn, distinct, filter, order_by, null_treatment) } @@ -843,6 +830,7 @@ aggregate_function!(bit_or); aggregate_function!(bit_xor); aggregate_function!(bool_and); aggregate_function!(bool_or); +aggregate_function!(corr, y x); fn add_builder_fns_to_window( window_fn: Expr, From 32d8dddea43fdea1179f34048f88ddffd934c495 Mon Sep 17 00:00:00 2001 From: Tim Saucer Date: Sat, 7 Sep 2024 08:42:39 -0400 Subject: [PATCH 13/35] Update unit test to make it easier to debug --- python/datafusion/tests/test_aggregation.py | 53 +++++++++++++-------- 1 file changed, 33 insertions(+), 20 deletions(-) diff --git a/python/datafusion/tests/test_aggregation.py b/python/datafusion/tests/test_aggregation.py index bc56c7a2..8cb0f1a5 100644 --- a/python/datafusion/tests/test_aggregation.py +++ b/python/datafusion/tests/test_aggregation.py @@ -149,34 +149,47 @@ def test_aggregation(df, agg_expr, expected, array_sort): assert result.column(0) == expected -def test_aggregate_100(df_aggregate_100): +@pytest.mark.parametrize( + "name,expr,expected", + [ + ( + "approx_percentile_cont", + f.approx_percentile_cont(column("c3"), 0.95, num_centroids=200), + [73, 68, 122, 124, 115], + ), + ( + "approx_perc_cont_few_centroids", + f.approx_percentile_cont(column("c3"), 0.95, num_centroids=5), + [72, 68, 119, 124, 115], + ), + ( + "approx_perc_cont_filtered", + f.approx_percentile_cont( + column("c3"), 0.95, num_centroids=200, filter=column("c3") > lit(0) + ), + [83, 68, 122, 124, 117], + ), + ], +) +def test_aggregate_100(df_aggregate_100, name, expr, expected): # https://github.com/apache/datafusion/blob/bddb6415a50746d2803dd908d19c3758952d74f9/datafusion/sqllogictest/test_files/aggregate.slt#L1490-L1498 - result = ( + df = ( df_aggregate_100.aggregate( [column("c1")], - [ - f.approx_percentile_cont(column("c3"), 0.95, num_centroids=200).alias( - "c3" - ), - f.approx_percentile_cont(column("c3"), 0.95, num_centroids=5).alias( - "c4" - ), - f.approx_percentile_cont( - column("c3"), 0.95, num_centroids=200, filter=column("c3") > lit(0) - ).alias("c5"), - ], + [expr.alias(name)], ) + .select("c1", f.round(column(name), lit(4)).alias(name)) .sort(column("c1").sort(ascending=True)) - .collect() ) + df.show() + + expected_dict = { + "c1": ["a", "b", "c", "d", "e"], + name: expected, + } - assert len(result) == 1 - result = result[0] - assert result.column("c1") == pa.array(["a", "b", "c", "d", "e"]) - assert result.column("c3") == pa.array([73, 68, 122, 124, 115]) - assert result.column("c4") == pa.array([72, 68, 119, 124, 115]) - assert result.column("c5") == pa.array([83, 68, 122, 124, 117]) + assert df.collect()[0].to_pydict() == expected_dict data_test_bitwise_and_boolean_functions = [ From 9543626f687fed9989dd45ad8504cf7bec62bc46 Mon Sep 17 00:00:00 2001 From: Tim Saucer Date: Sat, 7 Sep 2024 09:02:00 -0400 Subject: [PATCH 14/35] Update corr python wrapper to expose only builder parameters used --- python/datafusion/functions.py | 18 +++++++++++++++--- python/datafusion/tests/test_aggregation.py | 10 ++++++++++ 2 files changed, 25 insertions(+), 3 deletions(-) diff --git a/python/datafusion/functions.py b/python/datafusion/functions.py index 559266c8..35e33ba2 100644 --- a/python/datafusion/functions.py +++ b/python/datafusion/functions.py @@ -1643,9 +1643,21 @@ def avg( return Expr(f.avg(expression.expr, filter=filter_raw)) -def corr(value1: Expr, value2: Expr, distinct: bool = False) -> Expr: - """Returns the correlation coefficient between ``value1`` and ``value2``.""" - return Expr(f.corr(value1.expr, value2.expr, distinct=distinct)) +def corr(value_y: Expr, value_x: Expr, filter: Optional[Expr] = None) -> Expr: + """Returns the correlation coefficient between ``value1`` and ``value2``. + + This aggregate function expects both values to be numeric and will return a float. + + If using the builder functions described in ref:`_aggregation` this function ignores + the options ``order_by``, ``null_treatment``, and ``distinct``. + + Args: + value_y: The dependent variable for correlation + value_x: The independent variable for correlation + filter: If provided, only compute against rows for which the filter is true + """ + filter_raw = filter.expr if filter is not None else None + return Expr(f.corr(value_y.expr, value_x.expr, filter=filter_raw)) def count(args: Expr | list[Expr] | None = None, distinct: bool = False) -> Expr: diff --git a/python/datafusion/tests/test_aggregation.py b/python/datafusion/tests/test_aggregation.py index 8cb0f1a5..59551053 100644 --- a/python/datafusion/tests/test_aggregation.py +++ b/python/datafusion/tests/test_aggregation.py @@ -169,6 +169,16 @@ def test_aggregation(df, agg_expr, expected, array_sort): ), [83, 68, 122, 124, 117], ), + ( + "corr", + f.corr(column("c3"), column("c2")), + [-0.1056, -0.2808, 0.0023, 0.0022, -0.2473], + ), + ( + "corr_w_filter", + f.corr(column("c3"), column("c2"), filter=column("c3") > lit(0)), + [-0.3298, 0.2925, 0.2467, -0.2269, 0.0358], + ), ], ) def test_aggregate_100(df_aggregate_100, name, expr, expected): From 8d16a3cb0994a5003a2ff9db9cd1ec0ef46b5208 Mon Sep 17 00:00:00 2001 From: Tim Saucer Date: Sat, 7 Sep 2024 09:04:39 -0400 Subject: [PATCH 15/35] Update count and count_star to use macro for exposing --- python/datafusion/functions.py | 2 +- src/functions.rs | 21 +-------------------- 2 files changed, 2 insertions(+), 21 deletions(-) diff --git a/python/datafusion/functions.py b/python/datafusion/functions.py index 35e33ba2..72b7cd0f 100644 --- a/python/datafusion/functions.py +++ b/python/datafusion/functions.py @@ -367,7 +367,7 @@ def col(name: str) -> Expr: def count_star() -> Expr: """Create a COUNT(1) aggregate expression.""" - return Expr(f.count_star()) + return Expr(f.count(Expr.literal(1))) def case(expr: Expr) -> CaseBuilder: diff --git a/src/functions.rs b/src/functions.rs index 7d2ff6ef..038d5289 100644 --- a/src/functions.rs +++ b/src/functions.rs @@ -433,25 +433,6 @@ fn col(name: &str) -> PyResult { }) } -// TODO: should we just expose this in python? -/// Create a COUNT(1) aggregate expression -#[pyfunction] -fn count_star() -> PyExpr { - functions_aggregate::expr_fn::count(lit(1)).into() -} - -/// Wrapper for [`functions_aggregate::expr_fn::count`] -/// Count the number of non-null values in the column -#[pyfunction] -fn count(expr: PyExpr, distinct: bool) -> PyResult { - let expr = functions_aggregate::expr_fn::count(expr.expr); - if distinct { - Ok(expr.distinct().build()?.into()) - } else { - Ok(expr.into()) - } -} - /// Create a CASE WHEN statement with literal WHEN expressions for comparison to the base expression. #[pyfunction] fn case(expr: PyExpr) -> PyResult { @@ -831,6 +812,7 @@ aggregate_function!(bit_xor); aggregate_function!(bool_and); aggregate_function!(bool_or); aggregate_function!(corr, y x); +aggregate_function!(count); fn add_builder_fns_to_window( window_fn: Expr, @@ -979,7 +961,6 @@ pub(crate) fn init_module(m: &Bound<'_, PyModule>) -> PyResult<()> { m.add_wrapped(wrap_pyfunction!(cosh))?; m.add_wrapped(wrap_pyfunction!(cot))?; m.add_wrapped(wrap_pyfunction!(count))?; - m.add_wrapped(wrap_pyfunction!(count_star))?; m.add_wrapped(wrap_pyfunction!(covar_pop))?; m.add_wrapped(wrap_pyfunction!(covar_samp))?; m.add_wrapped(wrap_pyfunction!(current_date))?; From 55ebc179e0f9987ccb83bc8486583a014354dab5 Mon Sep 17 00:00:00 2001 From: Tim Saucer Date: Sat, 7 Sep 2024 09:25:52 -0400 Subject: [PATCH 16/35] Update count and count_star with approprate aggregation options --- python/datafusion/functions.py | 51 ++++++++++++++++----- python/datafusion/tests/test_aggregation.py | 5 ++ 2 files changed, 44 insertions(+), 12 deletions(-) diff --git a/python/datafusion/functions.py b/python/datafusion/functions.py index 72b7cd0f..ef95d361 100644 --- a/python/datafusion/functions.py +++ b/python/datafusion/functions.py @@ -365,9 +365,18 @@ def col(name: str) -> Expr: return Expr(f.col(name)) -def count_star() -> Expr: - """Create a COUNT(1) aggregate expression.""" - return Expr(f.count(Expr.literal(1))) +def count_star(filter: Optional[Expr] = None) -> Expr: + """Create a COUNT(1) aggregate expression. + + This aggregate function will count all of the rows in the partition. + + If using the builder functions described in ref:`_aggregation` this function ignores + the options ``order_by``, ``distinct``, and ``null_treatment``. + + Args: + filter: If provided, only count rows for which the filter is true + """ + return count(Expr.literal(1), filter=filter) def case(expr: Expr) -> CaseBuilder: @@ -1660,15 +1669,33 @@ def corr(value_y: Expr, value_x: Expr, filter: Optional[Expr] = None) -> Expr: return Expr(f.corr(value_y.expr, value_x.expr, filter=filter_raw)) -def count(args: Expr | list[Expr] | None = None, distinct: bool = False) -> Expr: - """Returns the number of rows that match the given arguments.""" - if args is None: - return count(Expr.literal(1), distinct=distinct) - if isinstance(args, list): - args = [arg.expr for arg in args] - elif isinstance(args, Expr): - args = [args.expr] - return Expr(f.count(*args, distinct=distinct)) +def count( + expressions: Expr | list[Expr] | None = None, + distinct: bool = False, + filter: Optional[Expr] = None, +) -> Expr: + """Returns the number of rows that match the given arguments. + + This aggregate function will count the non-null rows provided in the expression. + + If using the builder functions described in ref:`_aggregation` this function ignores + the options ``order_by`` and ``null_treatment``. + + Args: + expressions: Argument to perform bitwise calculation on + distinct: If True, a single entry for each distinct value will be in the result + filter: If provided, only compute against rows for which the filter is true + """ + filter_raw = filter.expr if filter is not None else None + + if expressions is None: + args = [Expr.literal(1).expr] + elif isinstance(expressions, list): + args = [arg.expr for arg in expressions] + else: + args = [expressions.expr] + + return Expr(f.count(*args, distinct=distinct, filter=filter_raw)) def covar(y: Expr, x: Expr) -> Expr: diff --git a/python/datafusion/tests/test_aggregation.py b/python/datafusion/tests/test_aggregation.py index 59551053..0eafe087 100644 --- a/python/datafusion/tests/test_aggregation.py +++ b/python/datafusion/tests/test_aggregation.py @@ -136,6 +136,11 @@ def test_aggregation_stats(df, agg_expr, calc_expected): False, ), (f.avg(column("b"), filter=column("a") != lit(1)), pa.array([5.0]), False), + (f.count(column("b"), distinct=True), pa.array([2]), False), + (f.count(column("b"), filter=column("a") != 3), pa.array([2]), False), + (f.count(), pa.array([3]), False), + (f.count(column("e")), pa.array([2]), False), + (f.count_star(filter=column("a") != 3), pa.array([2]), False), ], ) def test_aggregation(df, agg_expr, expected, array_sort): From 7e42e6cf58f66cdedd1e94ebbe4a99b8808c3f00 Mon Sep 17 00:00:00 2001 From: Tim Saucer Date: Sat, 7 Sep 2024 09:26:18 -0400 Subject: [PATCH 17/35] Move covar_pop and covar_samp to use macro for aggregates --- src/functions.rs | 12 ++---------- 1 file changed, 2 insertions(+), 10 deletions(-) diff --git a/src/functions.rs b/src/functions.rs index 038d5289..8ac8593d 100644 --- a/src/functions.rs +++ b/src/functions.rs @@ -102,16 +102,6 @@ pub fn sum(args: PyExpr) -> PyExpr { functions_aggregate::expr_fn::sum(args.expr).into() } -#[pyfunction] -pub fn covar_samp(y: PyExpr, x: PyExpr) -> PyExpr { - functions_aggregate::expr_fn::covar_samp(y.expr, x.expr).into() -} - -#[pyfunction] -pub fn covar_pop(y: PyExpr, x: PyExpr) -> PyExpr { - functions_aggregate::expr_fn::covar_pop(y.expr, x.expr).into() -} - #[pyfunction] pub fn median(arg: PyExpr) -> PyExpr { functions_aggregate::expr_fn::median(arg.expr).into() @@ -813,6 +803,8 @@ aggregate_function!(bool_and); aggregate_function!(bool_or); aggregate_function!(corr, y x); aggregate_function!(count); +aggregate_function!(covar_samp, y x); +aggregate_function!(covar_pop, y x); fn add_builder_fns_to_window( window_fn: Expr, From ceb65c6220177c180cf23dbac6a83a4d6adc9dae Mon Sep 17 00:00:00 2001 From: Tim Saucer Date: Sat, 7 Sep 2024 10:15:18 -0400 Subject: [PATCH 18/35] Updateing covar_pop and covar_samp with builder option --- python/datafusion/functions.py | 44 ++++++++++++++++----- python/datafusion/tests/test_aggregation.py | 20 ++++++++++ 2 files changed, 54 insertions(+), 10 deletions(-) diff --git a/python/datafusion/functions.py b/python/datafusion/functions.py index ef95d361..59d3efb2 100644 --- a/python/datafusion/functions.py +++ b/python/datafusion/functions.py @@ -1698,22 +1698,46 @@ def count( return Expr(f.count(*args, distinct=distinct, filter=filter_raw)) -def covar(y: Expr, x: Expr) -> Expr: - """Computes the sample covariance. +def covar_pop(value_y: Expr, value_x: Expr, filter: Optional[Expr] = None) -> Expr: + """Computes the population covariance. - This is an alias for :py:func:`covar_samp`. + This aggregate function expects both values to be numeric and will return a float. + + If using the builder functions described in ref:`_aggregation` this function ignores + the options ``order_by``, ``null_treatment``, and ``distinct``. + + Args: + value_y: The dependent variable for covariance + value_x: The independent variable for covariance + filter: If provided, only compute against rows for which the filter is true """ - return covar_samp(y, x) + filter_raw = filter.expr if filter is not None else None + return Expr(f.covar_pop(value_y.expr, value_x.expr, filter=filter_raw)) -def covar_pop(y: Expr, x: Expr) -> Expr: - """Computes the population covariance.""" - return Expr(f.covar_pop(y.expr, x.expr)) +def covar_samp(value_y: Expr, value_x: Expr, filter: Optional[Expr] = None) -> Expr: + """Computes the sample covariance. + This aggregate function expects both values to be numeric and will return a float. -def covar_samp(y: Expr, x: Expr) -> Expr: - """Computes the sample covariance.""" - return Expr(f.covar_samp(y.expr, x.expr)) + If using the builder functions described in ref:`_aggregation` this function ignores + the options ``order_by``, ``null_treatment``, and ``distinct``. + + Args: + value_y: The dependent variable for covariance + value_x: The independent variable for covariance + filter: If provided, only compute against rows for which the filter is true + """ + filter_raw = filter.expr if filter is not None else None + return Expr(f.covar_samp(value_y.expr, value_x.expr, filter=filter_raw)) + + +def covar(value_y: Expr, value_x: Expr, filter: Optional[Expr] = None) -> Expr: + """Computes the sample covariance. + + This is an alias for :py:func:`covar_samp`. + """ + return covar_samp(value_y, value_x, filter) def grouping(arg: Expr, distinct: bool = False) -> Expr: diff --git a/python/datafusion/tests/test_aggregation.py b/python/datafusion/tests/test_aggregation.py index 0eafe087..00ff7ca0 100644 --- a/python/datafusion/tests/test_aggregation.py +++ b/python/datafusion/tests/test_aggregation.py @@ -184,6 +184,26 @@ def test_aggregation(df, agg_expr, expected, array_sort): f.corr(column("c3"), column("c2"), filter=column("c3") > lit(0)), [-0.3298, 0.2925, 0.2467, -0.2269, 0.0358], ), + ( + "covar_pop", + f.covar_pop(column("c3"), column("c2")), + [-7.2857, -25.6731, 0.2222, 0.2469, -20.2857], + ), + ( + "covar_pop_w_filter", + f.covar_pop(column("c3"), column("c2"), filter=column("c3") > lit(0)), + [-9.25, 9.0579, 13.7521, -9.9669, 1.1641], + ), + ( + "covar_samp", + f.covar_samp(column("c3"), column("c2")), + [-7.65, -27.0994, 0.2333, 0.2614, -21.3], + ), + ( + "covar_samp_w_filter", + f.covar_samp(column("c3"), column("c2"), filter=column("c3") > lit(0)), + [-10.5714, 9.9636, 15.1273, -10.9636, 1.2417], + ), ], ) def test_aggregate_100(df_aggregate_100, name, expr, expected): From b7262baa66a362a122a1b40393f00312ec60c0e2 Mon Sep 17 00:00:00 2001 From: Tim Saucer Date: Sat, 7 Sep 2024 10:26:42 -0400 Subject: [PATCH 19/35] Use macro for last_value and move first_value to be near it --- src/functions.rs | 66 ++++++++++++++++++++++++++++-------------------- 1 file changed, 38 insertions(+), 28 deletions(-) diff --git a/src/functions.rs b/src/functions.rs index 8ac8593d..528ec991 100644 --- a/src/functions.rs +++ b/src/functions.rs @@ -35,7 +35,7 @@ use datafusion::functions_aggregate; use datafusion::logical_expr::expr::Alias; use datafusion::logical_expr::sqlparser::ast::NullTreatment as DFNullTreatment; use datafusion::logical_expr::{ - expr::{find_df_window_func, AggregateFunction, Sort, WindowFunction}, + expr::{find_df_window_func, Sort, WindowFunction}, lit, Expr, WindowFunctionDefinition, }; @@ -265,33 +265,6 @@ fn add_builder_fns_to_aggregate( Ok(builder.build()?.into()) } -#[pyfunction] -pub fn first_value( - expr: PyExpr, - distinct: Option, - filter: Option, - order_by: Option>, - null_treatment: Option, -) -> PyResult { - // If we initialize the UDAF with order_by directly, then it gets over-written by the builder - let agg_fn = functions_aggregate::expr_fn::first_value(expr.expr, None); - - add_builder_fns_to_aggregate(agg_fn, distinct, filter, order_by, null_treatment) -} - -#[pyfunction] -pub fn last_value( - expr: PyExpr, - distinct: Option, - filter: Option, - order_by: Option>, - null_treatment: Option, -) -> PyResult { - let agg_fn = functions_aggregate::expr_fn::last_value(vec![expr.expr]); - - add_builder_fns_to_aggregate(agg_fn, distinct, filter, order_by, null_treatment) -} - #[pyfunction] fn in_list(expr: PyExpr, value: Vec, negated: bool) -> PyExpr { datafusion::logical_expr::in_list( @@ -569,6 +542,26 @@ macro_rules! aggregate_function { }; } +macro_rules! aggregate_function_vec_args { + ($NAME: ident) => { + aggregate_function_vec_args!($NAME, expr); + }; + ($NAME: ident, $($arg:ident)*) => { + #[pyfunction] + fn $NAME( + $($arg: PyExpr),*, + distinct: Option, + filter: Option, + order_by: Option>, + null_treatment: Option + ) -> PyResult { + let agg_fn = functions_aggregate::expr_fn::$NAME(vec![$($arg.into()),*]); + + add_builder_fns_to_aggregate(agg_fn, distinct, filter, order_by, null_treatment) + } + }; +} + /// Generates a [pyo3] wrapper for [datafusion::functions::expr_fn] /// /// These functions have explicit named arguments. @@ -805,6 +798,23 @@ aggregate_function!(corr, y x); aggregate_function!(count); aggregate_function!(covar_samp, y x); aggregate_function!(covar_pop, y x); +aggregate_function_vec_args!(last_value); + +// We handle first_value explicitly because the signature expects an order_by +// https://github.com/apache/datafusion/issues/12376 +#[pyfunction] +pub fn first_value( + expr: PyExpr, + distinct: Option, + filter: Option, + order_by: Option>, + null_treatment: Option, +) -> PyResult { + // If we initialize the UDAF with order_by directly, then it gets over-written by the builder + let agg_fn = functions_aggregate::expr_fn::first_value(expr.expr, None); + + add_builder_fns_to_aggregate(agg_fn, distinct, filter, order_by, null_treatment) +} fn add_builder_fns_to_window( window_fn: Expr, From 91e5f7d2fd72a1e8348504b01ab7bd7e056aaf15 Mon Sep 17 00:00:00 2001 From: Tim Saucer Date: Sat, 7 Sep 2024 10:45:40 -0400 Subject: [PATCH 20/35] Update first_value and last_value with the builder parameters that are relevant --- python/datafusion/functions.py | 46 ++++++++---- python/datafusion/tests/test_aggregation.py | 83 +++++++++++++++++++++ python/datafusion/tests/test_functions.py | 24 ------ 3 files changed, 113 insertions(+), 40 deletions(-) diff --git a/python/datafusion/functions.py b/python/datafusion/functions.py index 59d3efb2..8b917d00 100644 --- a/python/datafusion/functions.py +++ b/python/datafusion/functions.py @@ -1864,51 +1864,65 @@ def regr_syy(y: Expr, x: Expr, distinct: bool = False) -> Expr: def first_value( - arg: Expr, - distinct: bool = False, + expression: Expr, filter: Optional[Expr] = None, order_by: Optional[list[Expr]] = None, - null_treatment: Optional[NullTreatment] = None, + null_treatment: NullTreatment = NullTreatment.RESPECT_NULLS, ) -> Expr: - """Returns the first value in a group of values.""" + """Returns the first value in a group of values. + + This aggregate function will return the first value in the partition. + + If using the builder functions described in ref:`_aggregation` this function ignores + the option ``distinct``. + + Args: + expression: Argument to perform bitwise calculation on + filter: If provided, only compute against rows for which the filter is true + order_by: Set the ordering of the expression to evaluate + null_treatment: Assign whether to respect or ignull null values. + """ order_by_raw = expr_list_to_raw_expr_list(order_by) filter_raw = filter.expr if filter is not None else None - null_treatment_raw = null_treatment.value if null_treatment is not None else None return Expr( f.first_value( - arg.expr, - distinct=distinct, + expression.expr, filter=filter_raw, order_by=order_by_raw, - null_treatment=null_treatment_raw, + null_treatment=null_treatment.value, ) ) def last_value( - arg: Expr, - distinct: bool = False, + expression: Expr, filter: Optional[Expr] = None, order_by: Optional[list[Expr]] = None, null_treatment: NullTreatment = NullTreatment.RESPECT_NULLS, ) -> Expr: """Returns the last value in a group of values. - To set parameters on this expression, use ``.order_by()``, ``.distinct()``, - ``.filter()``, or ``.null_treatment()``. + This aggregate function will return the last value in the partition. + + If using the builder functions described in ref:`_aggregation` this function ignores + the option ``distinct``. + + Args: + expression: Argument to perform bitwise calculation on + filter: If provided, only compute against rows for which the filter is true + order_by: Set the ordering of the expression to evaluate + null_treatment: Assign whether to respect or ignull null values. """ order_by_raw = expr_list_to_raw_expr_list(order_by) filter_raw = filter.expr if filter is not None else None - null_treatment_raw = null_treatment.value if null_treatment is not None else None return Expr( f.last_value( - arg.expr, - distinct=distinct, + expression.expr, filter=filter_raw, order_by=order_by_raw, - null_treatment=null_treatment_raw, + null_treatment=null_treatment.value, ) ) diff --git a/python/datafusion/tests/test_aggregation.py b/python/datafusion/tests/test_aggregation.py index 00ff7ca0..e859e17f 100644 --- a/python/datafusion/tests/test_aggregation.py +++ b/python/datafusion/tests/test_aggregation.py @@ -21,6 +21,7 @@ from datafusion import SessionContext, column, lit from datafusion import functions as f +from datafusion.common import NullTreatment @pytest.fixture @@ -41,6 +42,23 @@ def df(): return ctx.create_dataframe([[batch]]) +@pytest.fixture +def df_partitioned(): + ctx = SessionContext() + + # create a RecordBatch and a new DataFrame from it + batch = pa.RecordBatch.from_arrays( + [ + pa.array([0, 1, 2, 3, 4, 5, 6]), + pa.array([7, None, 7, 8, 9, None, 9]), + pa.array(["A", "A", "A", "A", "B", "B", "B"]), + ], + names=["a", "b", "c"], + ) + + return ctx.create_dataframe([[batch]]) + + @pytest.fixture def df_aggregate_100(): ctx = SessionContext() @@ -256,3 +274,68 @@ def test_bit_and_bool_fns(df, name, expr, result): } assert df.collect()[0].to_pydict() == expected + + +@pytest.mark.parametrize( + "name,expr,result", + [ + ("first_value", f.first_value(column("a")), [0, 4]), + ( + "first_value_ordered", + f.first_value(column("a"), order_by=[column("a").sort(ascending=False)]), + [3, 6], + ), + ( + "first_value_with_null", + f.first_value( + column("b"), + order_by=[column("b").sort(ascending=True)], + null_treatment=NullTreatment.RESPECT_NULLS, + ), + [None, None], + ), + ( + "first_value_ignore_null", + f.first_value( + column("b"), + order_by=[column("b").sort(ascending=True)], + null_treatment=NullTreatment.IGNORE_NULLS, + ), + [7, 9], + ), + ("last_value", f.last_value(column("a")), [3, 6]), + ( + "last_value_ordered", + f.last_value(column("a"), order_by=[column("a").sort(ascending=False)]), + [0, 4], + ), + ( + "last_value_with_null", + f.last_value( + column("b"), + order_by=[column("b").sort(ascending=True, nulls_first=False)], + null_treatment=NullTreatment.RESPECT_NULLS, + ), + [None, None], + ), + ( + "last_value_ignore_null", + f.last_value( + column("b"), + order_by=[column("b").sort(ascending=True)], + null_treatment=NullTreatment.IGNORE_NULLS, + ), + [8, 9], + ), + ], +) +def test_first_last_value(df_partitioned, name, expr, result) -> None: + df = df_partitioned.aggregate([column("c")], [expr.alias(name)]).sort(column("c")) + df.show() + + expected = { + "c": ["A", "B"], + name: result, + } + + assert df.collect()[0].to_pydict() == expected diff --git a/python/datafusion/tests/test_functions.py b/python/datafusion/tests/test_functions.py index e7e6d79e..bc5d50cc 100644 --- a/python/datafusion/tests/test_functions.py +++ b/python/datafusion/tests/test_functions.py @@ -942,30 +942,6 @@ def test_regr_funcs_df(func, expected): assert result_df[0].column(0) == expected -def test_first_last_value(df): - df = df.aggregate( - [], - [ - f.first_value(column("a")), - f.first_value(column("b")), - f.first_value(column("d")), - f.last_value(column("a")), - f.last_value(column("b")), - f.last_value(column("d")), - ], - ) - - result = df.collect() - result = result[0] - assert result.column(0) == pa.array(["Hello"]) - assert result.column(1) == pa.array([4]) - assert result.column(2) == pa.array([datetime(2022, 12, 31)]) - assert result.column(3) == pa.array(["!"]) - assert result.column(4) == pa.array([6]) - assert result.column(5) == pa.array([datetime(2020, 7, 2)]) - df.show() - - def test_binary_string_functions(df): df = df.select( f.encode(column("a"), literal("base64")), From fde7e7068d5674680bdf07592dfa3bdaf565e785 Mon Sep 17 00:00:00 2001 From: Tim Saucer Date: Sat, 7 Sep 2024 12:44:38 -0400 Subject: [PATCH 21/35] Remove grouping since it is not actually implemented upstream --- python/datafusion/functions.py | 9 --------- src/functions.rs | 17 ++++++----------- 2 files changed, 6 insertions(+), 20 deletions(-) diff --git a/python/datafusion/functions.py b/python/datafusion/functions.py index 8b917d00..95e12305 100644 --- a/python/datafusion/functions.py +++ b/python/datafusion/functions.py @@ -127,7 +127,6 @@ "floor", "from_unixtime", "gcd", - "grouping", "in_list", "initcap", "isnan", @@ -1740,14 +1739,6 @@ def covar(value_y: Expr, value_x: Expr, filter: Optional[Expr] = None) -> Expr: return covar_samp(value_y, value_x, filter) -def grouping(arg: Expr, distinct: bool = False) -> Expr: - """Indicates if the expression is aggregated or not. - - Returns 1 if the value of the argument is aggregated, 0 if not. - """ - return Expr(f.grouping(arg.expr, distinct=distinct)) - - def max(arg: Expr, distinct: bool = False) -> Expr: """Returns the maximum value of the argument.""" return Expr(f.max(arg.expr, distinct=distinct)) diff --git a/src/functions.rs b/src/functions.rs index 528ec991..d2413475 100644 --- a/src/functions.rs +++ b/src/functions.rs @@ -87,16 +87,6 @@ pub fn approx_percentile_cont_with_weight( add_builder_fns_to_aggregate(agg_fn, None, filter, None, None) } -#[pyfunction] -pub fn grouping(expression: PyExpr, distinct: bool) -> PyResult { - let expr = functions_aggregate::expr_fn::grouping(expression.expr); - if distinct { - Ok(expr.distinct().build()?.into()) - } else { - Ok(expr.into()) - } -} - #[pyfunction] pub fn sum(args: PyExpr) -> PyExpr { functions_aggregate::expr_fn::sum(args.expr).into() @@ -798,6 +788,11 @@ aggregate_function!(corr, y x); aggregate_function!(count); aggregate_function!(covar_samp, y x); aggregate_function!(covar_pop, y x); + +// Code is commented out since grouping is not yet implemented +// https://github.com/apache/datafusion-python/issues/861 +// aggregate_function!(grouping); + aggregate_function_vec_args!(last_value); // We handle first_value explicitly because the signature expects an order_by @@ -979,7 +974,7 @@ pub(crate) fn init_module(m: &Bound<'_, PyModule>) -> PyResult<()> { m.add_wrapped(wrap_pyfunction!(floor))?; m.add_wrapped(wrap_pyfunction!(from_unixtime))?; m.add_wrapped(wrap_pyfunction!(gcd))?; - m.add_wrapped(wrap_pyfunction!(grouping))?; + // m.add_wrapped(wrap_pyfunction!(grouping))?; m.add_wrapped(wrap_pyfunction!(in_list))?; m.add_wrapped(wrap_pyfunction!(initcap))?; m.add_wrapped(wrap_pyfunction!(isnan))?; From 22826d4250d1f8786a11bc70fc3ac4823cac4395 Mon Sep 17 00:00:00 2001 From: Tim Saucer Date: Sat, 7 Sep 2024 12:47:31 -0400 Subject: [PATCH 22/35] Move median to use macro --- src/functions.rs | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/src/functions.rs b/src/functions.rs index d2413475..2348289b 100644 --- a/src/functions.rs +++ b/src/functions.rs @@ -92,11 +92,6 @@ pub fn sum(args: PyExpr) -> PyExpr { functions_aggregate::expr_fn::sum(args.expr).into() } -#[pyfunction] -pub fn median(arg: PyExpr) -> PyExpr { - functions_aggregate::expr_fn::median(arg.expr).into() -} - #[pyfunction] pub fn stddev(expression: PyExpr, distinct: bool) -> PyResult { let expr = functions_aggregate::expr_fn::stddev(expression.expr); @@ -788,6 +783,7 @@ aggregate_function!(corr, y x); aggregate_function!(count); aggregate_function!(covar_samp, y x); aggregate_function!(covar_pop, y x); +aggregate_function!(median); // Code is commented out since grouping is not yet implemented // https://github.com/apache/datafusion-python/issues/861 From 85df127acef062a0e2a500465bbf1c94b3e196fb Mon Sep 17 00:00:00 2001 From: Tim Saucer Date: Sat, 7 Sep 2024 13:18:13 -0400 Subject: [PATCH 23/35] Expose builder options for median --- python/datafusion/functions.py | 21 ++++++++++++++++++--- python/datafusion/tests/test_aggregation.py | 2 ++ 2 files changed, 20 insertions(+), 3 deletions(-) diff --git a/python/datafusion/functions.py b/python/datafusion/functions.py index 95e12305..7b72b690 100644 --- a/python/datafusion/functions.py +++ b/python/datafusion/functions.py @@ -1752,9 +1752,24 @@ def mean(expression: Expr, filter: Optional[Expr] = None) -> Expr: return avg(expression, filter) -def median(arg: Expr) -> Expr: - """Computes the median of a set of numbers.""" - return Expr(f.median(arg.expr)) +def median( + expression: Expr, distinct: bool = False, filter: Optional[Expr] = None +) -> Expr: + """Computes the median of a set of numbers. + + This aggregate function returns the median value of the expression for the given + aggregate function. + + If using the builder functions described in ref:`_aggregation` this function ignores + the options ``order_by`` and ``null_treatment``. + + Args: + expression: The value to compute the median of + distinct: If True, a single entry for each distinct value will be in the result + filter: If provided, only compute against rows for which the filter is true + """ + filter_raw = filter.expr if filter is not None else None + return Expr(f.median(expression.expr, distinct=distinct, filter=filter_raw)) def min(arg: Expr, distinct: bool = False) -> Expr: diff --git a/python/datafusion/tests/test_aggregation.py b/python/datafusion/tests/test_aggregation.py index e859e17f..2a864e58 100644 --- a/python/datafusion/tests/test_aggregation.py +++ b/python/datafusion/tests/test_aggregation.py @@ -127,6 +127,8 @@ def test_aggregation_stats(df, agg_expr, calc_expected): False, ), (f.approx_median(column("b")), pa.array([4]), False), + (f.median(column("b"), distinct=True), pa.array([5]), False), + (f.median(column("b"), filter=column("a") != 2), pa.array([5]), False), (f.approx_median(column("b"), filter=column("a") != 2), pa.array([5]), False), (f.approx_percentile_cont(column("b"), 0.5), pa.array([4]), False), ( From 3296e1aeb99c1872c9f3d8d48e5a1c58aa638e81 Mon Sep 17 00:00:00 2001 From: Tim Saucer Date: Sat, 7 Sep 2024 18:11:28 -0400 Subject: [PATCH 24/35] Expose nth value --- python/datafusion/functions.py | 66 +++++++++++++++++++-- python/datafusion/tests/test_aggregation.py | 28 +++++++++ src/functions.rs | 15 +++++ 3 files changed, 103 insertions(+), 6 deletions(-) diff --git a/python/datafusion/functions.py b/python/datafusion/functions.py index 7b72b690..f2d0a029 100644 --- a/python/datafusion/functions.py +++ b/python/datafusion/functions.py @@ -180,6 +180,7 @@ "named_struct", "nanvl", "now", + "nth_value", "nullif", "octet_length", "order_by", @@ -1739,9 +1740,18 @@ def covar(value_y: Expr, value_x: Expr, filter: Optional[Expr] = None) -> Expr: return covar_samp(value_y, value_x, filter) -def max(arg: Expr, distinct: bool = False) -> Expr: - """Returns the maximum value of the argument.""" - return Expr(f.max(arg.expr, distinct=distinct)) +def max(expression: Expr, filter: Optional[Expr] = None) -> Expr: + """Aggregate function that returns the maximum value of the argument. + + If using the builder functions described in ref:`_aggregation` this function ignores + the options ``order_by``, ``null_treatment``, and ``distinct``. + + Args: + expression: The value to find the maximum of + filter: If provided, only compute against rows for which the filter is true + """ + filter_raw = filter.expr if filter is not None else None + return Expr(f.max(expression.expr, filter=filter_raw)) def mean(expression: Expr, filter: Optional[Expr] = None) -> Expr: @@ -1772,9 +1782,18 @@ def median( return Expr(f.median(expression.expr, distinct=distinct, filter=filter_raw)) -def min(arg: Expr, distinct: bool = False) -> Expr: - """Returns the minimum value of the argument.""" - return Expr(f.min(arg.expr, distinct=distinct)) +def min(expression: Expr, filter: Optional[Expr] = None) -> Expr: + """Returns the minimum value of the argument. + + If using the builder functions described in ref:`_aggregation` this function ignores + the options ``order_by``, ``null_treatment``, and ``distinct``. + + Args: + expression: The value to find the minimum of + filter: If provided, only compute against rows for which the filter is true + """ + filter_raw = filter.expr if filter is not None else None + return Expr(f.min(expression.expr, filter=filter_raw)) def sum(arg: Expr) -> Expr: @@ -1933,6 +1952,41 @@ def last_value( ) +def nth_value( + expression: Expr, + n: int, + filter: Optional[Expr] = None, + order_by: Optional[list[Expr]] = None, + null_treatment: NullTreatment = NullTreatment.RESPECT_NULLS, +) -> Expr: + """Returns the n-th value in a group of values. + + This aggregate function will return the n-th value in the partition. + + If using the builder functions described in ref:`_aggregation` this function ignores + the option ``distinct``. + + Args: + expression: Argument to perform bitwise calculation on + n: Index of value to return. Starts at 1. + filter: If provided, only compute against rows for which the filter is true + order_by: Set the ordering of the expression to evaluate + null_treatment: Assign whether to respect or ignull null values. + """ + order_by_raw = expr_list_to_raw_expr_list(order_by) + filter_raw = filter.expr if filter is not None else None + + return Expr( + f.nth_value( + expression.expr, + n, + filter=filter_raw, + order_by=order_by_raw, + null_treatment=null_treatment.value, + ) + ) + + def bit_and(expression: Expr, filter: Optional[Expr] = None) -> Expr: """Computes the bitwise AND of the argument. diff --git a/python/datafusion/tests/test_aggregation.py b/python/datafusion/tests/test_aggregation.py index 2a864e58..529f98cb 100644 --- a/python/datafusion/tests/test_aggregation.py +++ b/python/datafusion/tests/test_aggregation.py @@ -161,6 +161,8 @@ def test_aggregation_stats(df, agg_expr, calc_expected): (f.count(), pa.array([3]), False), (f.count(column("e")), pa.array([2]), False), (f.count_star(filter=column("a") != 3), pa.array([2]), False), + (f.max(column("a"), filter=column("a") != lit(3)), pa.array([2]), False), + (f.min(column("a"), filter=column("a") != lit(1)), pa.array([2]), False), ], ) def test_aggregation(df, agg_expr, expected, array_sort): @@ -329,6 +331,32 @@ def test_bit_and_bool_fns(df, name, expr, result): ), [8, 9], ), + ("first_value", f.first_value(column("a")), [0, 4]), + ( + "nth_value_ordered", + f.nth_value(column("a"), 2, order_by=[column("a").sort(ascending=False)]), + [2, 5], + ), + ( + "nth_value_with_null", + f.nth_value( + column("b"), + 3, + order_by=[column("b").sort(ascending=True, nulls_first=False)], + null_treatment=NullTreatment.RESPECT_NULLS, + ), + [8, None], + ), + ( + "nth_value_ignore_null", + f.nth_value( + column("b"), + 2, + order_by=[column("b").sort(ascending=True)], + null_treatment=NullTreatment.IGNORE_NULLS, + ), + [7, 9], + ), ], ) def test_first_last_value(df_partitioned, name, expr, result) -> None: diff --git a/src/functions.rs b/src/functions.rs index 2348289b..d9a0c511 100644 --- a/src/functions.rs +++ b/src/functions.rs @@ -807,6 +807,20 @@ pub fn first_value( add_builder_fns_to_aggregate(agg_fn, distinct, filter, order_by, null_treatment) } +// nth_value requires a non-expr argument +#[pyfunction] +pub fn nth_value( + expr: PyExpr, + n: i64, + distinct: Option, + filter: Option, + order_by: Option>, + null_treatment: Option, +) -> PyResult { + let agg_fn = datafusion::functions_aggregate::nth_value::nth_value(vec![expr.expr, lit(n)]); + add_builder_fns_to_aggregate(agg_fn, distinct, filter, order_by, null_treatment) +} + fn add_builder_fns_to_window( window_fn: Expr, partition_by: Option>, @@ -1058,6 +1072,7 @@ pub(crate) fn init_module(m: &Bound<'_, PyModule>) -> PyResult<()> { m.add_wrapped(wrap_pyfunction!(regr_syy))?; m.add_wrapped(wrap_pyfunction!(first_value))?; m.add_wrapped(wrap_pyfunction!(last_value))?; + m.add_wrapped(wrap_pyfunction!(nth_value))?; m.add_wrapped(wrap_pyfunction!(bit_and))?; m.add_wrapped(wrap_pyfunction!(bit_or))?; m.add_wrapped(wrap_pyfunction!(bit_xor))?; From a0e24b412625f4e25303ff88101df4591059c335 Mon Sep 17 00:00:00 2001 From: Tim Saucer Date: Sun, 8 Sep 2024 08:02:20 -0400 Subject: [PATCH 25/35] Updating linear regression functions to use filter and macro --- python/datafusion/functions.py | 254 ++++++++++++++++---- python/datafusion/tests/test_aggregation.py | 1 - python/datafusion/tests/test_functions.py | 77 +++++- src/functions.rs | 99 +------- 4 files changed, 278 insertions(+), 153 deletions(-) diff --git a/python/datafusion/functions.py b/python/datafusion/functions.py index f2d0a029..035d3dc8 100644 --- a/python/datafusion/functions.py +++ b/python/datafusion/functions.py @@ -374,7 +374,7 @@ def count_star(filter: Optional[Expr] = None) -> Expr: the options ``order_by``, ``distinct``, and ``null_treatment``. Args: - filter: If provided, only count rows for which the filter is true + filter: If provided, only count rows for which the filter is True """ return count(Expr.literal(1), filter=filter) @@ -1517,7 +1517,7 @@ def approx_distinct( Args: expression: Values to check for distinct entries - filter: If provided, only compute against rows for which the filter is true + filter: If provided, only compute against rows for which the filter is True """ filter_raw = filter.expr if filter is not None else None @@ -1535,7 +1535,7 @@ def approx_median(expression: Expr, filter: Optional[Expr] = None) -> Expr: Args: expression: Values to find the median for - filter: If provided, only compute against rows for which the filter is true + filter: If provided, only compute against rows for which the filter is True """ filter_raw = filter.expr if filter is not None else None return Expr(f.approx_median(expression.expr, filter=filter_raw)) @@ -1566,7 +1566,7 @@ def approx_percentile_cont( expression: Values for which to find the approximate percentile percentile: This must be between 0.0 and 1.0, inclusive num_centroids: Max bin size for the t-digest algorithm - filter: If provided, only compute against rows for which the filter is true + filter: If provided, only compute against rows for which the filter is True """ filter_raw = filter.expr if filter is not None else None return Expr( @@ -1591,7 +1591,7 @@ def approx_percentile_cont_with_weight( expression: Values for which to find the approximate percentile weight: Relative weight for each of the values in ``expression`` percentile: This must be between 0.0 and 1.0, inclusive - filter: If provided, only compute against rows for which the filter is true + filter: If provided, only compute against rows for which the filter is True """ filter_raw = filter.expr if filter is not None else None @@ -1620,7 +1620,7 @@ def array_agg( Args: expression: Values to combine into an array distinct: If True, a single entry for each distinct value will be in the result - filter: If provided, only compute against rows for which the filter is true + filter: If provided, only compute against rows for which the filter is True order_by: Order the resultant array values """ order_by_raw = expr_list_to_raw_expr_list(order_by) @@ -1646,7 +1646,7 @@ def avg( Args: expression: Values to combine into an array - filter: If provided, only compute against rows for which the filter is true + filter: If provided, only compute against rows for which the filter is True """ filter_raw = filter.expr if filter is not None else None return Expr(f.avg(expression.expr, filter=filter_raw)) @@ -1663,7 +1663,7 @@ def corr(value_y: Expr, value_x: Expr, filter: Optional[Expr] = None) -> Expr: Args: value_y: The dependent variable for correlation value_x: The independent variable for correlation - filter: If provided, only compute against rows for which the filter is true + filter: If provided, only compute against rows for which the filter is True """ filter_raw = filter.expr if filter is not None else None return Expr(f.corr(value_y.expr, value_x.expr, filter=filter_raw)) @@ -1684,7 +1684,7 @@ def count( Args: expressions: Argument to perform bitwise calculation on distinct: If True, a single entry for each distinct value will be in the result - filter: If provided, only compute against rows for which the filter is true + filter: If provided, only compute against rows for which the filter is True """ filter_raw = filter.expr if filter is not None else None @@ -1709,7 +1709,7 @@ def covar_pop(value_y: Expr, value_x: Expr, filter: Optional[Expr] = None) -> Ex Args: value_y: The dependent variable for covariance value_x: The independent variable for covariance - filter: If provided, only compute against rows for which the filter is true + filter: If provided, only compute against rows for which the filter is True """ filter_raw = filter.expr if filter is not None else None return Expr(f.covar_pop(value_y.expr, value_x.expr, filter=filter_raw)) @@ -1726,7 +1726,7 @@ def covar_samp(value_y: Expr, value_x: Expr, filter: Optional[Expr] = None) -> E Args: value_y: The dependent variable for covariance value_x: The independent variable for covariance - filter: If provided, only compute against rows for which the filter is true + filter: If provided, only compute against rows for which the filter is True """ filter_raw = filter.expr if filter is not None else None return Expr(f.covar_samp(value_y.expr, value_x.expr, filter=filter_raw)) @@ -1748,7 +1748,7 @@ def max(expression: Expr, filter: Optional[Expr] = None) -> Expr: Args: expression: The value to find the maximum of - filter: If provided, only compute against rows for which the filter is true + filter: If provided, only compute against rows for which the filter is True """ filter_raw = filter.expr if filter is not None else None return Expr(f.max(expression.expr, filter=filter_raw)) @@ -1776,7 +1776,7 @@ def median( Args: expression: The value to compute the median of distinct: If True, a single entry for each distinct value will be in the result - filter: If provided, only compute against rows for which the filter is true + filter: If provided, only compute against rows for which the filter is True """ filter_raw = filter.expr if filter is not None else None return Expr(f.median(expression.expr, distinct=distinct, filter=filter_raw)) @@ -1790,7 +1790,7 @@ def min(expression: Expr, filter: Optional[Expr] = None) -> Expr: Args: expression: The value to find the minimum of - filter: If provided, only compute against rows for which the filter is true + filter: If provided, only compute against rows for which the filter is True """ filter_raw = filter.expr if filter is not None else None return Expr(f.min(expression.expr, filter=filter_raw)) @@ -1837,55 +1837,211 @@ def var_samp(arg: Expr) -> Expr: return Expr(f.var_samp(arg.expr)) -def regr_avgx(y: Expr, x: Expr, distinct: bool = False) -> Expr: +def regr_avgx( + y: Expr, + x: Expr, + filter: Optional[Expr] = None, +) -> Expr: """Computes the average of the independent variable ``x``. - Only non-null pairs of the inputs are evaluated. + This is a linear regression aggregate function. Only non-null pairs of the inputs + are evaluated. + + If using the builder functions described in ref:`_aggregation` this function ignores + the options ``order_by``, ``null_treatment``, and ``distinct``. + + Args: + y: The linear regression dependent variable + x: The linear regression independent variable + filter: If provided, only compute against rows for which the filter is True """ - return Expr(f.regr_avgx(y.expr, x.expr, distinct)) + filter_raw = filter.expr if filter is not None else None + + return Expr(f.regr_avgx(y.expr, x.expr, filter=filter_raw)) -def regr_avgy(y: Expr, x: Expr, distinct: bool = False) -> Expr: +def regr_avgy( + y: Expr, + x: Expr, + filter: Optional[Expr] = None, +) -> Expr: """Computes the average of the dependent variable ``y``. - Only non-null pairs of the inputs are evaluated. + This is a linear regression aggregate function. Only non-null pairs of the inputs + are evaluated. + + If using the builder functions described in ref:`_aggregation` this function ignores + the options ``order_by``, ``null_treatment``, and ``distinct``. + + Args: + y: The linear regression dependent variable + x: The linear regression independent variable + filter: If provided, only compute against rows for which the filter is True + """ + filter_raw = filter.expr if filter is not None else None + + return Expr(f.regr_avgy(y.expr, x.expr, filter=filter_raw)) + + +def regr_count( + y: Expr, + x: Expr, + filter: Optional[Expr] = None, +) -> Expr: + """Counts the number of rows in which both expressions are not null. + + This is a linear regression aggregate function. Only non-null pairs of the inputs + are evaluated. + + If using the builder functions described in ref:`_aggregation` this function ignores + the options ``order_by``, ``null_treatment``, and ``distinct``. + + Args: + y: The linear regression dependent variable + x: The linear regression independent variable + filter: If provided, only compute against rows for which the filter is True """ - return Expr(f.regr_avgy(y.expr, x.expr, distinct)) + filter_raw = filter.expr if filter is not None else None + return Expr(f.regr_count(y.expr, x.expr, filter=filter_raw)) -def regr_count(y: Expr, x: Expr, distinct: bool = False) -> Expr: - """Counts the number of rows in which both expressions are not null.""" - return Expr(f.regr_count(y.expr, x.expr, distinct)) +def regr_intercept( + y: Expr, + x: Expr, + filter: Optional[Expr] = None, +) -> Expr: + """Computes the intercept from the linear regression. + + This is a linear regression aggregate function. Only non-null pairs of the inputs + are evaluated. + + If using the builder functions described in ref:`_aggregation` this function ignores + the options ``order_by``, ``null_treatment``, and ``distinct``. -def regr_intercept(y: Expr, x: Expr, distinct: bool = False) -> Expr: - """Computes the intercept from the linear regression.""" - return Expr(f.regr_intercept(y.expr, x.expr, distinct)) + Args: + y: The linear regression dependent variable + x: The linear regression independent variable + filter: If provided, only compute against rows for which the filter is True + """ + filter_raw = filter.expr if filter is not None else None + return Expr(f.regr_intercept(y.expr, x.expr, filter=filter_raw)) -def regr_r2(y: Expr, x: Expr, distinct: bool = False) -> Expr: - """Computes the R-squared value from linear regression.""" - return Expr(f.regr_r2(y.expr, x.expr, distinct)) +def regr_r2( + y: Expr, + x: Expr, + filter: Optional[Expr] = None, +) -> Expr: + """Computes the R-squared value from linear regression. -def regr_slope(y: Expr, x: Expr, distinct: bool = False) -> Expr: - """Computes the slope from linear regression.""" - return Expr(f.regr_slope(y.expr, x.expr, distinct)) + This is a linear regression aggregate function. Only non-null pairs of the inputs + are evaluated. + If using the builder functions described in ref:`_aggregation` this function ignores + the options ``order_by``, ``null_treatment``, and ``distinct``. -def regr_sxx(y: Expr, x: Expr, distinct: bool = False) -> Expr: - """Computes the sum of squares of the independent variable ``x``.""" - return Expr(f.regr_sxx(y.expr, x.expr, distinct)) + Args: + y: The linear regression dependent variable + x: The linear regression independent variable + filter: If provided, only compute against rows for which the filter is True + """ + filter_raw = filter.expr if filter is not None else None + return Expr(f.regr_r2(y.expr, x.expr, filter=filter_raw)) -def regr_sxy(y: Expr, x: Expr, distinct: bool = False) -> Expr: - """Computes the sum of products of pairs of numbers.""" - return Expr(f.regr_sxy(y.expr, x.expr, distinct)) +def regr_slope( + y: Expr, + x: Expr, + filter: Optional[Expr] = None, +) -> Expr: + """Computes the slope from linear regression. + + This is a linear regression aggregate function. Only non-null pairs of the inputs + are evaluated. + + If using the builder functions described in ref:`_aggregation` this function ignores + the options ``order_by``, ``null_treatment``, and ``distinct``. + + Args: + y: The linear regression dependent variable + x: The linear regression independent variable + filter: If provided, only compute against rows for which the filter is True + """ + filter_raw = filter.expr if filter is not None else None + + return Expr(f.regr_slope(y.expr, x.expr, filter=filter_raw)) + + +def regr_sxx( + y: Expr, + x: Expr, + filter: Optional[Expr] = None, +) -> Expr: + """Computes the sum of squares of the independent variable ``x``. + + This is a linear regression aggregate function. Only non-null pairs of the inputs + are evaluated. + + If using the builder functions described in ref:`_aggregation` this function ignores + the options ``order_by``, ``null_treatment``, and ``distinct``. + + Args: + y: The linear regression dependent variable + x: The linear regression independent variable + filter: If provided, only compute against rows for which the filter is True + """ + filter_raw = filter.expr if filter is not None else None + + return Expr(f.regr_sxx(y.expr, x.expr, filter=filter_raw)) + + +def regr_sxy( + y: Expr, + x: Expr, + filter: Optional[Expr] = None, +) -> Expr: + """Computes the sum of products of pairs of numbers. + + This is a linear regression aggregate function. Only non-null pairs of the inputs + are evaluated. + + If using the builder functions described in ref:`_aggregation` this function ignores + the options ``order_by``, ``null_treatment``, and ``distinct``. + + Args: + y: The linear regression dependent variable + x: The linear regression independent variable + filter: If provided, only compute against rows for which the filter is True + """ + filter_raw = filter.expr if filter is not None else None + + return Expr(f.regr_sxy(y.expr, x.expr, filter=filter_raw)) + + +def regr_syy( + y: Expr, + x: Expr, + filter: Optional[Expr] = None, +) -> Expr: + """Computes the sum of squares of the dependent variable ``y``. + + This is a linear regression aggregate function. Only non-null pairs of the inputs + are evaluated. + + If using the builder functions described in ref:`_aggregation` this function ignores + the options ``order_by``, ``null_treatment``, and ``distinct``. + + Args: + y: The linear regression dependent variable + x: The linear regression independent variable + filter: If provided, only compute against rows for which the filter is True + """ + filter_raw = filter.expr if filter is not None else None -def regr_syy(y: Expr, x: Expr, distinct: bool = False) -> Expr: - """Computes the sum of squares of the dependent variable ``y``.""" - return Expr(f.regr_syy(y.expr, x.expr, distinct)) + return Expr(f.regr_syy(y.expr, x.expr, filter=filter_raw)) def first_value( @@ -1903,7 +2059,7 @@ def first_value( Args: expression: Argument to perform bitwise calculation on - filter: If provided, only compute against rows for which the filter is true + filter: If provided, only compute against rows for which the filter is True order_by: Set the ordering of the expression to evaluate null_treatment: Assign whether to respect or ignull null values. """ @@ -1935,7 +2091,7 @@ def last_value( Args: expression: Argument to perform bitwise calculation on - filter: If provided, only compute against rows for which the filter is true + filter: If provided, only compute against rows for which the filter is True order_by: Set the ordering of the expression to evaluate null_treatment: Assign whether to respect or ignull null values. """ @@ -1969,7 +2125,7 @@ def nth_value( Args: expression: Argument to perform bitwise calculation on n: Index of value to return. Starts at 1. - filter: If provided, only compute against rows for which the filter is true + filter: If provided, only compute against rows for which the filter is True order_by: Set the ordering of the expression to evaluate null_treatment: Assign whether to respect or ignull null values. """ @@ -1997,7 +2153,7 @@ def bit_and(expression: Expr, filter: Optional[Expr] = None) -> Expr: Args: expression: Argument to perform bitwise calculation on - filter: If provided, only compute against rows for which the filter is true + filter: If provided, only compute against rows for which the filter is True """ filter_raw = filter.expr if filter is not None else None return Expr(f.bit_and(expression.expr, filter=filter_raw)) @@ -2013,7 +2169,7 @@ def bit_or(expression: Expr, filter: Optional[Expr] = None) -> Expr: Args: expression: Argument to perform bitwise calculation on - filter: If provided, only compute against rows for which the filter is true + filter: If provided, only compute against rows for which the filter is True """ filter_raw = filter.expr if filter is not None else None return Expr(f.bit_or(expression.expr, filter=filter_raw)) @@ -2032,7 +2188,7 @@ def bit_xor( Args: expression: Argument to perform bitwise calculation on distinct: If True, evaluate each unique value of expression only once - filter: If provided, only compute against rows for which the filter is true + filter: If provided, only compute against rows for which the filter is True """ filter_raw = filter.expr if filter is not None else None return Expr(f.bit_xor(expression.expr, distinct=distinct, filter=filter_raw)) @@ -2049,7 +2205,7 @@ def bool_and(expression: Expr, filter: Optional[Expr] = None) -> Expr: Args: expression: Argument to perform calculation on - filter: If provided, only compute against rows for which the filter is true + filter: If provided, only compute against rows for which the filter is True """ filter_raw = filter.expr if filter is not None else None return Expr(f.bool_and(expression.expr, filter=filter_raw)) @@ -2066,7 +2222,7 @@ def bool_or(expression: Expr, filter: Optional[Expr] = None) -> Expr: Args: expression: Argument to perform calculation on - filter: If provided, only compute against rows for which the filter is true + filter: If provided, only compute against rows for which the filter is True """ filter_raw = filter.expr if filter is not None else None return Expr(f.bool_or(expression.expr, filter=filter_raw)) diff --git a/python/datafusion/tests/test_aggregation.py b/python/datafusion/tests/test_aggregation.py index 529f98cb..b930c95c 100644 --- a/python/datafusion/tests/test_aggregation.py +++ b/python/datafusion/tests/test_aggregation.py @@ -361,7 +361,6 @@ def test_bit_and_bool_fns(df, name, expr, result): ) def test_first_last_value(df_partitioned, name, expr, result) -> None: df = df_partitioned.aggregate([column("c")], [expr.alias(name)]).sort(column("c")) - df.show() expected = { "c": ["A", "B"], diff --git a/python/datafusion/tests/test_functions.py b/python/datafusion/tests/test_functions.py index bc5d50cc..8e3c5139 100644 --- a/python/datafusion/tests/test_functions.py +++ b/python/datafusion/tests/test_functions.py @@ -912,17 +912,64 @@ def test_regr_funcs_sql_2(): @pytest.mark.parametrize( "func, expected", [ - pytest.param(f.regr_slope, pa.array([2], type=pa.float64()), id="regr_slope"), + pytest.param(f.regr_slope(column("c2"), column("c1")), [4.6], id="regr_slope"), pytest.param( - f.regr_intercept, pa.array([0], type=pa.float64()), id="regr_intercept" + f.regr_slope(column("c2"), column("c1"), filter=column("c1") > literal(2)), + [8], + id="regr_slope_filter", + ), + pytest.param( + f.regr_intercept(column("c2"), column("c1")), [-4], id="regr_intercept" + ), + pytest.param( + f.regr_intercept( + column("c2"), column("c1"), filter=column("c1") > literal(2) + ), + [-16], + id="regr_intercept_filter", + ), + pytest.param(f.regr_count(column("c2"), column("c1")), [4], id="regr_count"), + pytest.param( + f.regr_count(column("c2"), column("c1"), filter=column("c1") > literal(2)), + [2], + id="regr_count_filter", + ), + pytest.param(f.regr_r2(column("c2"), column("c1")), [0.92], id="regr_r2"), + pytest.param( + f.regr_r2(column("c2"), column("c1"), filter=column("c1") > literal(2)), + [1.0], + id="regr_r2_filter", + ), + pytest.param(f.regr_avgx(column("c2"), column("c1")), [2.5], id="regr_avgx"), + pytest.param( + f.regr_avgx(column("c2"), column("c1"), filter=column("c1") > literal(2)), + [3.5], + id="regr_avgx_filter", + ), + pytest.param(f.regr_avgy(column("c2"), column("c1")), [7.5], id="regr_avgy"), + pytest.param( + f.regr_avgy(column("c2"), column("c1"), filter=column("c1") > literal(2)), + [12.0], + id="regr_avgy_filter", + ), + pytest.param(f.regr_sxx(column("c2"), column("c1")), [5.0], id="regr_sxx"), + pytest.param( + f.regr_sxx(column("c2"), column("c1"), filter=column("c1") > literal(2)), + [0.5], + id="regr_sxx_filter", + ), + pytest.param(f.regr_syy(column("c2"), column("c1")), [115.0], id="regr_syy"), + pytest.param( + f.regr_syy(column("c2"), column("c1"), filter=column("c1") > literal(2)), + [32.0], + id="regr_syy_filter", + ), + pytest.param(f.regr_sxy(column("c2"), column("c1")), [23.0], id="regr_sxy"), + pytest.param( + f.regr_sxy(column("c2"), column("c1"), filter=column("c1") > literal(2)), + [4.0], + id="regr_sxy_filter", ), - pytest.param(f.regr_count, pa.array([3], type=pa.uint64()), id="regr_count"), - pytest.param(f.regr_r2, pa.array([1], type=pa.float64()), id="regr_r2"), - pytest.param(f.regr_avgx, pa.array([2], type=pa.float64()), id="regr_avgx"), - pytest.param(f.regr_avgy, pa.array([4], type=pa.float64()), id="regr_avgy"), - pytest.param(f.regr_sxx, pa.array([2], type=pa.float64()), id="regr_sxx"), - pytest.param(f.regr_syy, pa.array([8], type=pa.float64()), id="regr_syy"), - pytest.param(f.regr_sxy, pa.array([4], type=pa.float64()), id="regr_sxy"), ], ) def test_regr_funcs_df(func, expected): @@ -932,14 +979,18 @@ def test_regr_funcs_df(func, expected): ctx = SessionContext() # Create a DataFrame - data = {"column1": [1, 2, 3], "column2": [2, 4, 6]} + data = {"c1": [1, 2, 3, 4, 5, None], "c2": [2, 4, 8, 16, None, 64]} df = ctx.from_pydict(data, name="test_table") # Perform the regression function using DataFrame API - result_df = df.aggregate([], [func(f.col("column2"), f.col("column1"))]).collect() + df = df.aggregate([], [func.alias("result")]) + df.show() + + expected_dict = { + "result": expected, + } - # Assertion for DataFrame API result - assert result_df[0].column(0) == expected + assert df.collect()[0].to_pydict() == expected_dict def test_binary_string_functions(df): diff --git a/src/functions.rs b/src/functions.rs index d9a0c511..f6e001dc 100644 --- a/src/functions.rs +++ b/src/functions.rs @@ -127,96 +127,6 @@ pub fn var_pop(expression: PyExpr, distinct: bool) -> PyResult { } } -#[pyfunction] -pub fn regr_avgx(expr_y: PyExpr, expr_x: PyExpr, distinct: bool) -> PyResult { - let expr = functions_aggregate::expr_fn::regr_avgx(expr_y.expr, expr_x.expr); - if distinct { - Ok(expr.distinct().build()?.into()) - } else { - Ok(expr.into()) - } -} - -#[pyfunction] -pub fn regr_avgy(expr_y: PyExpr, expr_x: PyExpr, distinct: bool) -> PyResult { - let expr = functions_aggregate::expr_fn::regr_avgy(expr_y.expr, expr_x.expr); - if distinct { - Ok(expr.distinct().build()?.into()) - } else { - Ok(expr.into()) - } -} - -#[pyfunction] -pub fn regr_count(expr_y: PyExpr, expr_x: PyExpr, distinct: bool) -> PyResult { - let expr = functions_aggregate::expr_fn::regr_count(expr_y.expr, expr_x.expr); - if distinct { - Ok(expr.distinct().build()?.into()) - } else { - Ok(expr.into()) - } -} - -#[pyfunction] -pub fn regr_intercept(expr_y: PyExpr, expr_x: PyExpr, distinct: bool) -> PyResult { - let expr = functions_aggregate::expr_fn::regr_intercept(expr_y.expr, expr_x.expr); - if distinct { - Ok(expr.distinct().build()?.into()) - } else { - Ok(expr.into()) - } -} - -#[pyfunction] -pub fn regr_r2(expr_y: PyExpr, expr_x: PyExpr, distinct: bool) -> PyResult { - let expr = functions_aggregate::expr_fn::regr_r2(expr_y.expr, expr_x.expr); - if distinct { - Ok(expr.distinct().build()?.into()) - } else { - Ok(expr.into()) - } -} - -#[pyfunction] -pub fn regr_slope(expr_y: PyExpr, expr_x: PyExpr, distinct: bool) -> PyResult { - let expr = functions_aggregate::expr_fn::regr_slope(expr_y.expr, expr_x.expr); - if distinct { - Ok(expr.distinct().build()?.into()) - } else { - Ok(expr.into()) - } -} - -#[pyfunction] -pub fn regr_sxx(expr_y: PyExpr, expr_x: PyExpr, distinct: bool) -> PyResult { - let expr = functions_aggregate::expr_fn::regr_sxx(expr_y.expr, expr_x.expr); - if distinct { - Ok(expr.distinct().build()?.into()) - } else { - Ok(expr.into()) - } -} - -#[pyfunction] -pub fn regr_sxy(expr_y: PyExpr, expr_x: PyExpr, distinct: bool) -> PyResult { - let expr = functions_aggregate::expr_fn::regr_sxy(expr_y.expr, expr_x.expr); - if distinct { - Ok(expr.distinct().build()?.into()) - } else { - Ok(expr.into()) - } -} - -#[pyfunction] -pub fn regr_syy(expr_y: PyExpr, expr_x: PyExpr, distinct: bool) -> PyResult { - let expr = functions_aggregate::expr_fn::regr_syy(expr_y.expr, expr_x.expr); - if distinct { - Ok(expr.distinct().build()?.into()) - } else { - Ok(expr.into()) - } -} - fn add_builder_fns_to_aggregate( agg_fn: Expr, distinct: Option, @@ -784,6 +694,15 @@ aggregate_function!(count); aggregate_function!(covar_samp, y x); aggregate_function!(covar_pop, y x); aggregate_function!(median); +aggregate_function!(regr_slope, y x); +aggregate_function!(regr_intercept, y x); +aggregate_function!(regr_count, y x); +aggregate_function!(regr_r2, y x); +aggregate_function!(regr_avgx, y x); +aggregate_function!(regr_avgy, y x); +aggregate_function!(regr_sxx, y x); +aggregate_function!(regr_syy, y x); +aggregate_function!(regr_sxy, y x); // Code is commented out since grouping is not yet implemented // https://github.com/apache/datafusion-python/issues/861 From 2325223f90f5889a0a783a31fabc02c15c1aa7bb Mon Sep 17 00:00:00 2001 From: Tim Saucer Date: Sun, 8 Sep 2024 08:15:14 -0400 Subject: [PATCH 26/35] Update stddev and stddev_pop to use filter and macro --- python/datafusion/functions.py | 34 ++++++++++++++++----- python/datafusion/tests/test_aggregation.py | 10 ++++++ src/functions.rs | 22 ++----------- 3 files changed, 38 insertions(+), 28 deletions(-) diff --git a/python/datafusion/functions.py b/python/datafusion/functions.py index 035d3dc8..33530e2d 100644 --- a/python/datafusion/functions.py +++ b/python/datafusion/functions.py @@ -1801,22 +1801,40 @@ def sum(arg: Expr) -> Expr: return Expr(f.sum(arg.expr)) -def stddev(arg: Expr, distinct: bool = False) -> Expr: - """Computes the standard deviation of the argument.""" - return Expr(f.stddev(arg.expr, distinct=distinct)) +def stddev(expression: Expr, filter: Optional[Expr] = None) -> Expr: + """Computes the standard deviation of the argument. + + If using the builder functions described in ref:`_aggregation` this function ignores + the options ``order_by``, ``null_treatment``, and ``distinct``. + + Args: + expression: The value to find the minimum of + filter: If provided, only compute against rows for which the filter is True + """ + filter_raw = filter.expr if filter is not None else None + return Expr(f.stddev(expression.expr, filter=filter_raw)) -def stddev_pop(arg: Expr, distinct: bool = False) -> Expr: - """Computes the population standard deviation of the argument.""" - return Expr(f.stddev_pop(arg.expr, distinct=distinct)) +def stddev_pop(expression: Expr, filter: Optional[Expr] = None) -> Expr: + """Computes the population standard deviation of the argument. + + If using the builder functions described in ref:`_aggregation` this function ignores + the options ``order_by``, ``null_treatment``, and ``distinct``. + + Args: + expression: The value to find the minimum of + filter: If provided, only compute against rows for which the filter is True + """ + filter_raw = filter.expr if filter is not None else None + return Expr(f.stddev_pop(expression.expr, filter=filter_raw)) -def stddev_samp(arg: Expr, distinct: bool = False) -> Expr: +def stddev_samp(arg: Expr, filter: Optional[Expr] = None) -> Expr: """Computes the sample standard deviation of the argument. This is an alias for :py:func:`stddev`. """ - return stddev(arg, distinct) + return stddev(arg, filter=filter) def var(arg: Expr) -> Expr: diff --git a/python/datafusion/tests/test_aggregation.py b/python/datafusion/tests/test_aggregation.py index b930c95c..565cb3df 100644 --- a/python/datafusion/tests/test_aggregation.py +++ b/python/datafusion/tests/test_aggregation.py @@ -163,6 +163,16 @@ def test_aggregation_stats(df, agg_expr, calc_expected): (f.count_star(filter=column("a") != 3), pa.array([2]), False), (f.max(column("a"), filter=column("a") != lit(3)), pa.array([2]), False), (f.min(column("a"), filter=column("a") != lit(1)), pa.array([2]), False), + ( + f.stddev(column("a"), filter=column("a") != lit(2)), + pa.array([np.sqrt(2)]), + False, + ), + ( + f.stddev_pop(column("a"), filter=column("a") != lit(2)), + pa.array([1.0]), + False, + ), ], ) def test_aggregation(df, agg_expr, expected, array_sort): diff --git a/src/functions.rs b/src/functions.rs index f6e001dc..77e251d2 100644 --- a/src/functions.rs +++ b/src/functions.rs @@ -92,26 +92,6 @@ pub fn sum(args: PyExpr) -> PyExpr { functions_aggregate::expr_fn::sum(args.expr).into() } -#[pyfunction] -pub fn stddev(expression: PyExpr, distinct: bool) -> PyResult { - let expr = functions_aggregate::expr_fn::stddev(expression.expr); - if distinct { - Ok(expr.distinct().build()?.into()) - } else { - Ok(expr.into()) - } -} - -#[pyfunction] -pub fn stddev_pop(expression: PyExpr, distinct: bool) -> PyResult { - let expr = functions_aggregate::expr_fn::stddev_pop(expression.expr); - if distinct { - Ok(expr.distinct().build()?.into()) - } else { - Ok(expr.into()) - } -} - #[pyfunction] pub fn var_samp(expression: PyExpr) -> PyExpr { functions_aggregate::expr_fn::var_sample(expression.expr).into() @@ -703,6 +683,8 @@ aggregate_function!(regr_avgy, y x); aggregate_function!(regr_sxx, y x); aggregate_function!(regr_syy, y x); aggregate_function!(regr_sxy, y x); +aggregate_function!(stddev); +aggregate_function!(stddev_pop); // Code is commented out since grouping is not yet implemented // https://github.com/apache/datafusion-python/issues/861 From 6be20946f59e77bc7d2e0391cf0e69fe3daa3880 Mon Sep 17 00:00:00 2001 From: Tim Saucer Date: Sun, 8 Sep 2024 08:19:30 -0400 Subject: [PATCH 27/35] Expose string_agg --- src/functions.rs | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/src/functions.rs b/src/functions.rs index 77e251d2..ea107bf2 100644 --- a/src/functions.rs +++ b/src/functions.rs @@ -722,6 +722,20 @@ pub fn nth_value( add_builder_fns_to_aggregate(agg_fn, distinct, filter, order_by, null_treatment) } +// string_agg requires a non-expr argument +#[pyfunction] +pub fn string_agg( + expr: PyExpr, + delimiter: String, + distinct: Option, + filter: Option, + order_by: Option>, + null_treatment: Option, +) -> PyResult { + let agg_fn = datafusion::functions_aggregate::string_agg::string_agg(expr.expr, lit(delimiter)); + add_builder_fns_to_aggregate(agg_fn, distinct, filter, order_by, null_treatment) +} + fn add_builder_fns_to_window( window_fn: Expr, partition_by: Option>, @@ -939,6 +953,7 @@ pub(crate) fn init_module(m: &Bound<'_, PyModule>) -> PyResult<()> { m.add_wrapped(wrap_pyfunction!(starts_with))?; m.add_wrapped(wrap_pyfunction!(stddev))?; m.add_wrapped(wrap_pyfunction!(stddev_pop))?; + m.add_wrapped(wrap_pyfunction!(string_agg))?; m.add_wrapped(wrap_pyfunction!(strpos))?; m.add_wrapped(wrap_pyfunction!(r#struct))?; // Use raw identifier since struct is a keyword m.add_wrapped(wrap_pyfunction!(substr))?; From 6420f07639df77df45026fc66d2453e7e7532e58 Mon Sep 17 00:00:00 2001 From: Tim Saucer Date: Sun, 8 Sep 2024 08:32:49 -0400 Subject: [PATCH 28/35] Add string_agg to python wrappers and add unit test --- python/datafusion/functions.py | 35 ++++++++++++++++++++ python/datafusion/tests/test_aggregation.py | 36 +++++++++++++++++++++ 2 files changed, 71 insertions(+) diff --git a/python/datafusion/functions.py b/python/datafusion/functions.py index 33530e2d..75e62db9 100644 --- a/python/datafusion/functions.py +++ b/python/datafusion/functions.py @@ -223,6 +223,7 @@ "stddev", "stddev_pop", "stddev_samp", + "string_agg", "strpos", "struct", "substr", @@ -2580,3 +2581,37 @@ def ntile( order_by=order_cols, ) ) + + +def string_agg( + expression: Expr, + delimiter: str, + filter: Optional[Expr] = None, + order_by: Optional[list[Expr]] = None, +) -> Expr: + """Concatenates the input strings. + + This aggregate function will concatenate input strings, ignoring null values, and + seperating them with the specified delimiter. Non-string values will be converted to + their string equivalents. + + If using the builder functions described in ref:`_aggregation` this function ignores + the options ``distinct`` and ``null_treatment``. + + Args: + expression: Argument to perform bitwise calculation on + delimiter: Text to place between each value of expression + filter: If provided, only compute against rows for which the filter is True + order_by: Set the ordering of the expression to evaluate + """ + order_by_raw = expr_list_to_raw_expr_list(order_by) + filter_raw = filter.expr if filter is not None else None + + return Expr( + f.string_agg( + expression.expr, + delimiter, + filter=filter_raw, + order_by=order_by_raw, + ) + ) diff --git a/python/datafusion/tests/test_aggregation.py b/python/datafusion/tests/test_aggregation.py index 565cb3df..4ebd26fb 100644 --- a/python/datafusion/tests/test_aggregation.py +++ b/python/datafusion/tests/test_aggregation.py @@ -378,3 +378,39 @@ def test_first_last_value(df_partitioned, name, expr, result) -> None: } assert df.collect()[0].to_pydict() == expected + + +@pytest.mark.parametrize( + "name,expr,result", + [ + ("string_agg", f.string_agg(column("a"), ","), "one,two,three,two"), + ("string_agg", f.string_agg(column("b"), ""), "03124"), + ( + "string_agg", + f.string_agg(column("a"), ",", filter=column("b") != lit(3)), + "one,three,two", + ), + ( + "string_agg", + f.string_agg(column("a"), ",", order_by=[column("b")]), + "one,three,two,two", + ), + ], +) +def test_string_agg(name, expr, result) -> None: + ctx = SessionContext() + + df = ctx.from_pydict( + { + "a": ["one", "two", None, "three", "two"], + "b": [0, 3, 1, 2, 4], + } + ) + + df = df.aggregate([], [expr.alias(name)]) + + expected = { + name: [result], + } + df.show() + assert df.collect()[0].to_pydict() == expected From 529de8833eae04d658d7f3c415dcc16e0ac68f0e Mon Sep 17 00:00:00 2001 From: Tim Saucer Date: Sun, 8 Sep 2024 08:35:57 -0400 Subject: [PATCH 29/35] Switch sum to use macro in rust side and expose correct options in python wrapper --- python/datafusion/functions.py | 20 +++++++++++++++++--- python/datafusion/tests/test_aggregation.py | 1 + src/functions.rs | 6 +----- 3 files changed, 19 insertions(+), 8 deletions(-) diff --git a/python/datafusion/functions.py b/python/datafusion/functions.py index 75e62db9..0186e586 100644 --- a/python/datafusion/functions.py +++ b/python/datafusion/functions.py @@ -1797,9 +1797,23 @@ def min(expression: Expr, filter: Optional[Expr] = None) -> Expr: return Expr(f.min(expression.expr, filter=filter_raw)) -def sum(arg: Expr) -> Expr: - """Computes the sum of a set of numbers.""" - return Expr(f.sum(arg.expr)) +def sum( + expression: Expr, + filter: Optional[Expr] = None, +) -> Expr: + """Computes the sum of a set of numbers. + + This aggregate function expects a numeric expression. + + If using the builder functions described in ref:`_aggregation` this function ignores + the options ``order_by``, ``null_treatment``, and ``distinct``. + + Args: + expression: Values to combine into an array + filter: If provided, only compute against rows for which the filter is True + """ + filter_raw = filter.expr if filter is not None else None + return Expr(f.sum(expression.expr, filter=filter_raw)) def stddev(expression: Expr, filter: Optional[Expr] = None) -> Expr: diff --git a/python/datafusion/tests/test_aggregation.py b/python/datafusion/tests/test_aggregation.py index 4ebd26fb..2ef944e6 100644 --- a/python/datafusion/tests/test_aggregation.py +++ b/python/datafusion/tests/test_aggregation.py @@ -156,6 +156,7 @@ def test_aggregation_stats(df, agg_expr, calc_expected): False, ), (f.avg(column("b"), filter=column("a") != lit(1)), pa.array([5.0]), False), + (f.sum(column("b"), filter=column("a") != lit(1)), pa.array([10]), False), (f.count(column("b"), distinct=True), pa.array([2]), False), (f.count(column("b"), filter=column("a") != 3), pa.array([2]), False), (f.count(), pa.array([3]), False), diff --git a/src/functions.rs b/src/functions.rs index ea107bf2..ecfd4296 100644 --- a/src/functions.rs +++ b/src/functions.rs @@ -87,11 +87,6 @@ pub fn approx_percentile_cont_with_weight( add_builder_fns_to_aggregate(agg_fn, None, filter, None, None) } -#[pyfunction] -pub fn sum(args: PyExpr) -> PyExpr { - functions_aggregate::expr_fn::sum(args.expr).into() -} - #[pyfunction] pub fn var_samp(expression: PyExpr) -> PyExpr { functions_aggregate::expr_fn::var_sample(expression.expr).into() @@ -664,6 +659,7 @@ aggregate_function!(array_agg); aggregate_function!(max); aggregate_function!(min); aggregate_function!(avg); +aggregate_function!(sum); aggregate_function!(bit_and); aggregate_function!(bit_or); aggregate_function!(bit_xor); From e352ee3c79a6217646620012fdbf52a14c3b916a Mon Sep 17 00:00:00 2001 From: Tim Saucer Date: Sun, 8 Sep 2024 08:46:01 -0400 Subject: [PATCH 30/35] Use macro for exposing var_pop and var_samp --- python/datafusion/functions.py | 43 +++++++++++++++++++++++++++------- src/functions.rs | 19 +++------------ 2 files changed, 38 insertions(+), 24 deletions(-) diff --git a/python/datafusion/functions.py b/python/datafusion/functions.py index 0186e586..163ff04e 100644 --- a/python/datafusion/functions.py +++ b/python/datafusion/functions.py @@ -246,6 +246,7 @@ "var", "var_pop", "var_samp", + "var_sample", "when", # Window Functions "window", @@ -1852,22 +1853,48 @@ def stddev_samp(arg: Expr, filter: Optional[Expr] = None) -> Expr: return stddev(arg, filter=filter) -def var(arg: Expr) -> Expr: +def var(expression: Expr, filter: Optional[Expr] = None) -> Expr: """Computes the sample variance of the argument. This is an alias for :py:func:`var_samp`. """ - return var_samp(arg) + return var_samp(expression, filter) -def var_pop(arg: Expr, distinct: bool = False) -> Expr: - """Computes the population variance of the argument.""" - return Expr(f.var_pop(arg.expr, distinct=distinct)) +def var_pop(expression: Expr, filter: Optional[Expr] = None) -> Expr: + """Computes the population variance of the argument. + If using the builder functions described in ref:`_aggregation` this function ignores + the options ``order_by``, ``null_treatment``, and ``distinct``. + + Args: + expression: The variable to compute the variance for + filter: If provided, only compute against rows for which the filter is True + """ + filter_raw = filter.expr if filter is not None else None + return Expr(f.var_pop(expression.expr, filter=filter_raw)) + + +def var_samp(expression: Expr, filter: Optional[Expr] = None) -> Expr: + """Computes the sample variance of the argument. -def var_samp(arg: Expr) -> Expr: - """Computes the sample variance of the argument.""" - return Expr(f.var_samp(arg.expr)) + If using the builder functions described in ref:`_aggregation` this function ignores + the options ``order_by``, ``null_treatment``, and ``distinct``. + + Args: + expression: The variable to compute the variance for + filter: If provided, only compute against rows for which the filter is True + """ + filter_raw = filter.expr if filter is not None else None + return Expr(f.var_sample(expression.expr, filter=filter_raw)) + + +def var_sample(expression: Expr, filter: Optional[Expr] = None) -> Expr: + """Computes the sample variance of the argument. + + This is an alias for :py:func:`var_samp`. + """ + return var_samp(expression, filter) def regr_avgx( diff --git a/src/functions.rs b/src/functions.rs index ecfd4296..6d593e3f 100644 --- a/src/functions.rs +++ b/src/functions.rs @@ -87,21 +87,6 @@ pub fn approx_percentile_cont_with_weight( add_builder_fns_to_aggregate(agg_fn, None, filter, None, None) } -#[pyfunction] -pub fn var_samp(expression: PyExpr) -> PyExpr { - functions_aggregate::expr_fn::var_sample(expression.expr).into() -} - -#[pyfunction] -pub fn var_pop(expression: PyExpr, distinct: bool) -> PyResult { - let expr = functions_aggregate::expr_fn::var_pop(expression.expr); - if distinct { - Ok(expr.distinct().build()?.into()) - } else { - Ok(expr.into()) - } -} - fn add_builder_fns_to_aggregate( agg_fn: Expr, distinct: Option, @@ -681,6 +666,8 @@ aggregate_function!(regr_syy, y x); aggregate_function!(regr_sxy, y x); aggregate_function!(stddev); aggregate_function!(stddev_pop); +aggregate_function!(var_sample); +aggregate_function!(var_pop); // Code is commented out since grouping is not yet implemented // https://github.com/apache/datafusion-python/issues/861 @@ -971,7 +958,7 @@ pub(crate) fn init_module(m: &Bound<'_, PyModule>) -> PyResult<()> { m.add_wrapped(wrap_pyfunction!(upper))?; m.add_wrapped(wrap_pyfunction!(self::uuid))?; // Use self to avoid name collision m.add_wrapped(wrap_pyfunction!(var_pop))?; - m.add_wrapped(wrap_pyfunction!(var_samp))?; + m.add_wrapped(wrap_pyfunction!(var_sample))?; m.add_wrapped(wrap_pyfunction!(window))?; m.add_wrapped(wrap_pyfunction!(regr_avgx))?; m.add_wrapped(wrap_pyfunction!(regr_avgy))?; From 18574687a5caa433d2ba4722f2e05746f49b3b50 Mon Sep 17 00:00:00 2001 From: Tim Saucer Date: Sun, 8 Sep 2024 08:50:44 -0400 Subject: [PATCH 31/35] Add unit tests for filtering on var_pop and var_samp --- python/datafusion/tests/test_aggregation.py | 20 ++++++++++++++++++++ 1 file changed, 20 insertions(+) diff --git a/python/datafusion/tests/test_aggregation.py b/python/datafusion/tests/test_aggregation.py index 2ef944e6..243a8c3c 100644 --- a/python/datafusion/tests/test_aggregation.py +++ b/python/datafusion/tests/test_aggregation.py @@ -237,6 +237,26 @@ def test_aggregation(df, agg_expr, expected, array_sort): f.covar_samp(column("c3"), column("c2"), filter=column("c3") > lit(0)), [-10.5714, 9.9636, 15.1273, -10.9636, 1.2417], ), + ( + "var_samp", + f.var_samp(column("c2")), + [1.9286, 2.2047, 1.6333, 2.1438, 1.6], + ), + ( + "var_samp_w_filter", + f.var_samp(column("c2"), filter=column("c3") > lit(0)), + [1.4286, 2.4182, 1.8545, 1.4727, 1.6292], + ), + ( + "var_pop", + f.var_pop(column("c2")), + [1.8367, 2.0886, 1.5556, 2.0247, 1.5238], + ), + ( + "var_pop_w_filter", + f.var_pop(column("c2"), filter=column("c3") > lit(0)), + [1.25, 2.1983, 1.686, 1.3388, 1.5273], + ), ], ) def test_aggregate_100(df_aggregate_100, name, expr, expected): From 7148dcbf0e5bc01798df80d1bc126ee673420194 Mon Sep 17 00:00:00 2001 From: Tim Saucer Date: Sun, 8 Sep 2024 10:32:34 -0400 Subject: [PATCH 32/35] Move approximation functions to use macro when possible --- src/functions.rs | 84 +++++++++++++++++++++--------------------------- 1 file changed, 36 insertions(+), 48 deletions(-) diff --git a/src/functions.rs b/src/functions.rs index 6d593e3f..b6002bc1 100644 --- a/src/functions.rs +++ b/src/functions.rs @@ -39,54 +39,6 @@ use datafusion::logical_expr::{ lit, Expr, WindowFunctionDefinition, }; -#[pyfunction] -pub fn approx_distinct(expression: PyExpr, filter: Option) -> PyResult { - let agg_fn = functions_aggregate::expr_fn::approx_distinct(expression.expr); - - add_builder_fns_to_aggregate(agg_fn, None, filter, None, None) -} - -#[pyfunction] -pub fn approx_median(expression: PyExpr, filter: Option) -> PyResult { - let agg_fn = functions_aggregate::expr_fn::approx_median(expression.expr); - - add_builder_fns_to_aggregate(agg_fn, None, filter, None, None) -} - -#[pyfunction] -pub fn approx_percentile_cont( - expression: PyExpr, - percentile: f64, - num_centroids: Option, // enforces optional arguments at the end, currently - filter: Option, -) -> PyResult { - let args = if let Some(num_centroids) = num_centroids { - vec![expression.expr, lit(percentile), lit(num_centroids)] - } else { - vec![expression.expr, lit(percentile)] - }; - let udaf = functions_aggregate::approx_percentile_cont::approx_percentile_cont_udaf(); - let agg_fn = udaf.call(args); - - add_builder_fns_to_aggregate(agg_fn, None, filter, None, None) -} - -#[pyfunction] -pub fn approx_percentile_cont_with_weight( - expression: PyExpr, - weight: PyExpr, - percentile: f64, - filter: Option, -) -> PyResult { - let agg_fn = functions_aggregate::expr_fn::approx_percentile_cont_with_weight( - expression.expr, - weight.expr, - lit(percentile), - ); - - add_builder_fns_to_aggregate(agg_fn, None, filter, None, None) -} - fn add_builder_fns_to_aggregate( agg_fn: Expr, distinct: Option, @@ -668,11 +620,47 @@ aggregate_function!(stddev); aggregate_function!(stddev_pop); aggregate_function!(var_sample); aggregate_function!(var_pop); +aggregate_function!(approx_distinct); +aggregate_function!(approx_median); // Code is commented out since grouping is not yet implemented // https://github.com/apache/datafusion-python/issues/861 // aggregate_function!(grouping); +#[pyfunction] +pub fn approx_percentile_cont( + expression: PyExpr, + percentile: f64, + num_centroids: Option, // enforces optional arguments at the end, currently + filter: Option, +) -> PyResult { + let args = if let Some(num_centroids) = num_centroids { + vec![expression.expr, lit(percentile), lit(num_centroids)] + } else { + vec![expression.expr, lit(percentile)] + }; + let udaf = functions_aggregate::approx_percentile_cont::approx_percentile_cont_udaf(); + let agg_fn = udaf.call(args); + + add_builder_fns_to_aggregate(agg_fn, None, filter, None, None) +} + +#[pyfunction] +pub fn approx_percentile_cont_with_weight( + expression: PyExpr, + weight: PyExpr, + percentile: f64, + filter: Option, +) -> PyResult { + let agg_fn = functions_aggregate::expr_fn::approx_percentile_cont_with_weight( + expression.expr, + weight.expr, + lit(percentile), + ); + + add_builder_fns_to_aggregate(agg_fn, None, filter, None, None) +} + aggregate_function_vec_args!(last_value); // We handle first_value explicitly because the signature expects an order_by From b55ff88305e03c03e25892db8137a2a61a2d0175 Mon Sep 17 00:00:00 2001 From: Tim Saucer Date: Sun, 8 Sep 2024 10:34:13 -0400 Subject: [PATCH 33/35] Update user documentation to explain in detail the options for aggregate functions --- .../common-operations/aggregations.rst | 206 ++++++++++++++++-- 1 file changed, 184 insertions(+), 22 deletions(-) diff --git a/docs/source/user-guide/common-operations/aggregations.rst b/docs/source/user-guide/common-operations/aggregations.rst index 7ad40221..8fee26a1 100644 --- a/docs/source/user-guide/common-operations/aggregations.rst +++ b/docs/source/user-guide/common-operations/aggregations.rst @@ -20,43 +20,205 @@ Aggregation ============ -An aggregate or aggregation is a function where the values of multiple rows are processed together to form a single summary value. -For performing an aggregation, DataFusion provides the :py:func:`~datafusion.dataframe.DataFrame.aggregate` +An aggregate or aggregation is a function where the values of multiple rows are processed together +to form a single summary value. For performing an aggregation, DataFusion provides the +:py:func:`~datafusion.dataframe.DataFrame.aggregate` .. ipython:: python + import urllib.request from datafusion import SessionContext - from datafusion import column, lit + from datafusion import col, lit from datafusion import functions as f - import random - ctx = SessionContext() - df = ctx.from_pydict( - { - "a": ["foo", "bar", "foo", "bar", "foo", "bar", "foo", "foo"], - "b": ["one", "one", "two", "three", "two", "two", "one", "three"], - "c": [random.randint(0, 100) for _ in range(8)], - "d": [random.random() for _ in range(8)], - }, - name="foo_bar" + urllib.request.urlretrieve( + "https://gist.githubusercontent.com/ritchie46/cac6b337ea52281aa23c049250a4ff03/raw/89a957ff3919d90e6ef2d34235e6bf22304f3366/pokemon.csv", + "pokemon.csv", ) - col_a = column("a") - col_b = column("b") - col_c = column("c") - col_d = column("d") + ctx = SessionContext() + df = ctx.read_csv("pokemon.csv") + + col_type_1 = col('"Type 1"') + col_type_2 = col('"Type 2"') + col_speed = col('"Speed"') + col_attack = col('"Attack"') - df.aggregate([], [f.approx_distinct(col_c), f.approx_median(col_d), f.approx_percentile_cont(col_d, lit(0.5))]) + df.aggregate([col_type_1], [ + f.approx_distinct(col_speed).alias("Count"), + f.approx_median(col_speed).alias("Median Speed"), + f.approx_percentile_cont(col_speed, 0.9).alias("90% Speed")]) -When the :code:`group_by` list is empty the aggregation is done over the whole :class:`.DataFrame`. For grouping -the :code:`group_by` list must contain at least one column +When the :code:`group_by` list is empty the aggregation is done over the whole :class:`.DataFrame`. +For grouping the :code:`group_by` list must contain at least one column. .. ipython:: python - df.aggregate([col_a], [f.sum(col_c), f.max(col_d), f.min(col_d)]) + df.aggregate([col_type_1], [ + f.max(col_speed).alias("Max Speed"), + f.avg(col_speed).alias("Avg Speed"), + f.min(col_speed).alias("Min Speed")]) More than one column can be used for grouping .. ipython:: python - df.aggregate([col_a, col_b], [f.sum(col_c), f.max(col_d), f.min(col_d)]) + df.aggregate([col_type_1, col_type_2], [ + f.max(col_speed).alias("Max Speed"), + f.avg(col_speed).alias("Avg Speed"), + f.min(col_speed).alias("Min Speed")]) + + + +Setting Parameters +------------------ + +Each of the built in aggregate functions provides arguments for the parameters that affect their +operation. These can also be overridden using the builder approach to setting any of the following +parameters. When you use the builder, you must call ``build()`` to finish. For example, these two +expressions are equivalent. + +.. ipython:: python + + first_1 = f.first_value(col("a"), order_by=[col("a")]) + first_2 = f.first_value(col("a")).order_by(col("a")).build() + +Ordering +^^^^^^^^ + +You can control the order in which rows are processed by window functions by providing +a list of ``order_by`` functions for the ``order_by`` parameter. In the following example, we +sort the Pokemon by their attack in increasing order and take the first value, which gives us the +Pokemon with the smallest attack value in each ``Type 1``. + +.. ipython:: python + + df.aggregate( + [col('"Type 1"')], + [f.first_value( + col('"Name"'), + order_by=[col('"Attack"').sort(ascending=True)] + ).alias("Smallest Attack") + ]) + +Distinct +^^^^^^^^ + +When you set the parameter ``distinct`` to ``True``, then unique values will only be evaluated one +time each. Suppose we want to create an array of all of the ``Type 2`` for each ``Type 1`` of our +Pokemon set. Since there will be many entries of ``Type 2`` we only one each distinct value. + +.. ipython:: python + + df.aggregate([col_type_1], [f.array_agg(col_type_2, distinct=True).alias("Type 2 List")]) + +In the output of the above we can see that there are some ``Type 1`` for which the ``Type 2`` entry +is ``null``. In reality, we probably want to filter those out. We can do this in two ways. First, +we can filter DataFrame rows that have no ``Type 2``. If we do this, we might have some ``Type 1`` +entries entirely removed. The second is we can use the ``filter`` argument described below. + +.. ipython:: python + + df.filter(col_type_2.is_not_null()).aggregate([col_type_1], [f.array_agg(col_type_2, distinct=True).alias("Type 2 List")]) + + df.aggregate([col_type_1], [f.array_agg(col_type_2, distinct=True, filter=col_type_2.is_not_null()).alias("Type 2 List")]) + +Which approach you take should depend on your use case. + +Null Treatment +^^^^^^^^^^^^^^ + +This option allows you to either respect or ignore null values. + +One common usage for handling nulls is the case where you want to find the first value within a +partition. By setting the null treatment to ignore nulls, we can find the first non-null value +in our partition. + + +.. ipython:: python + + from datafusion.common import NullTreatment + + df.aggregate([col_type_1], [ + f.first_value( + col_type_2, + order_by=[col_attack], + null_treatment=NullTreatment.RESPECT_NULLS + ).alias("Lowest Attack Type 2")]) + + df.aggregate([col_type_1], [ + f.first_value( + col_type_2, + order_by=[col_attack], + null_treatment=NullTreatment.IGNORE_NULLS + ).alias("Lowest Attack Type 2")]) + +Filter +^^^^^^ + +Using the filter option is useful for filtering results to include in the aggregate function. It can +be seen in the example above on how this can be useful to only filter rows evaluated by the +aggregate function without filtering rows from the entire DataFrame. + +Filter takes a single expression. + +Suppose we want to find the speed values for only Pokemon that have low Attack values. + +.. ipython:: python + + df.aggregate([col_type_1], [ + f.avg(col_speed).alias("Avg Speed All"), + f.avg(col_speed, filter=col_attack < lit(50)).alias("Avg Speed Low Attack")]) + + +Aggregate Functions +------------------- + +The available aggregate functions are: + +1. Comparison Functions + - :py:func:`datafusion.functions.min` + - :py:func:`datafusion.functions.max` +2. Math Functions + - :py:func:`datafusion.functions.sum` + - :py:func:`datafusion.functions.avg` + - :py:func:`datafusion.functions.median` +3. Array Functions + - :py:func:`datafusion.functions.array_agg` +4. Logical Functions + - :py:func:`datafusion.functions.bit_and` + - :py:func:`datafusion.functions.bit_or` + - :py:func:`datafusion.functions.bit_xor` + - :py:func:`datafusion.functions.bool_and` + - :py:func:`datafusion.functions.bool_or` +5. Statistical Functions + - :py:func:`datafusion.functions.count` + - :py:func:`datafusion.functions.corr` + - :py:func:`datafusion.functions.covar_samp` + - :py:func:`datafusion.functions.covar_pop` + - :py:func:`datafusion.functions.stddev` + - :py:func:`datafusion.functions.stddev_pop` + - :py:func:`datafusion.functions.var_samp` + - :py:func:`datafusion.functions.var_pop` +6. Linear Regression Functions + - :py:func:`datafusion.functions.regr_count` + - :py:func:`datafusion.functions.regr_slope` + - :py:func:`datafusion.functions.regr_intercept` + - :py:func:`datafusion.functions.regr_r2` + - :py:func:`datafusion.functions.regr_avgx` + - :py:func:`datafusion.functions.regr_avgy` + - :py:func:`datafusion.functions.regr_sxx` + - :py:func:`datafusion.functions.regr_syy` + - :py:func:`datafusion.functions.regr_slope` +7. Positional Functions + - :py:func:`datafusion.functions.first_value` + - :py:func:`datafusion.functions.last_value` + - :py:func:`datafusion.functions.nth_value` +8. String Functions + - :py:func:`datafusion.functions.string_agg` +9. Approximation Functions + - :py:func:`datafusion.functions.approx_distinct` + - :py:func:`datafusion.functions.approx_median` + - :py:func:`datafusion.functions.approx_percentile_cont` + - :py:func:`datafusion.functions.approx_percentile_cont_with_weight` + From ba09df1681bfdaf88e6b4a40c06939c6f7af8949 Mon Sep 17 00:00:00 2001 From: Tim Saucer Date: Sun, 8 Sep 2024 10:39:40 -0400 Subject: [PATCH 34/35] Update unit test to handle Python 3.10 --- python/datafusion/tests/test_wrapper_coverage.py | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/python/datafusion/tests/test_wrapper_coverage.py b/python/datafusion/tests/test_wrapper_coverage.py index 8c371638..4a47de2e 100644 --- a/python/datafusion/tests/test_wrapper_coverage.py +++ b/python/datafusion/tests/test_wrapper_coverage.py @@ -19,7 +19,12 @@ import datafusion.functions import datafusion.object_store import datafusion.substrait -from enum import EnumType + +# EnumType introduced in 3.11. 3.10 and prior it was called EnumMeta. +try: + from enum import EnumType +except ImportError: + from enum import EnumMeta as EnumType def missing_exports(internal_obj, wrapped_obj) -> None: From 62ab0ea1452aaffd8bf71b21a822c8f734a8fef7 Mon Sep 17 00:00:00 2001 From: Tim Saucer Date: Mon, 9 Sep 2024 13:42:05 -0400 Subject: [PATCH 35/35] Clean up commented code --- src/functions.rs | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/src/functions.rs b/src/functions.rs index b6002bc1..b9ca6301 100644 --- a/src/functions.rs +++ b/src/functions.rs @@ -47,10 +47,7 @@ fn add_builder_fns_to_aggregate( null_treatment: Option, ) -> PyResult { // Since ExprFuncBuilder::new() is private, we can guarantee initializing - // a builder with an `order_by` default of empty vec - // let order_by = order_by - // .map(|x| x.into_iter().map(|x| x.expr).collect::>()) - // .unwrap_or_default(); + // a builder with an `null_treatment` with option None let mut builder = agg_fn.null_treatment(None); if let Some(order_by_cols) = order_by { @@ -66,7 +63,6 @@ fn add_builder_fns_to_aggregate( builder = builder.filter(filter.expr); } - // would be nice if all the options builder methods accepted Option ... builder = builder.null_treatment(null_treatment.map(DFNullTreatment::from)); Ok(builder.build()?.into())