diff --git a/python/ray/data/dataset.py b/python/ray/data/dataset.py index 3b903e1d6417..79185c76c6eb 100644 --- a/python/ray/data/dataset.py +++ b/python/ray/data/dataset.py @@ -861,12 +861,12 @@ def aggregate(self, *aggs: AggregateFn) -> U: Returns: If the input dataset is a simple dataset then the output is - a tuple of (agg1, agg2, ...) where each tuple element is + a tuple of ``(agg1, agg2, ...)`` where each tuple element is the corresponding aggregation result. If the input dataset is an Arrow dataset then the output is - an ArrowRow where each column is the corresponding + an ``ArrowRow`` where each column is the corresponding aggregation result. - If the dataset is empty, return None. + If the dataset is empty, return ``None``. """ ret = self.groupby(None).aggregate(*aggs).take(1) return ret[0] if len(ret) > 0 else None @@ -957,6 +957,7 @@ def _is_arrow_dataset(self) -> bool: def _aggregate_on(self, agg_cls: type, on: AggregateOnT, *args, **kwargs): """Helper for aggregating on a particular subset of the dataset. + This validates the `on` argument, and converts a list of column names or lambdas to a multi-aggregation. A null `on` results in a multi-aggregation on all columns for an Arrow Dataset, and a single @@ -983,16 +984,49 @@ def _build_multicolumn_aggs(self, def sum(self, on: AggregateOnT = None) -> U: """Compute sum over entire dataset. + This is a blocking operation. + Examples: >>> ray.data.range(100).sum() + >>> ray.data.from_items([ + ... (i, i**2) + ... for i in range(100)]).sum(lambda x: x[1]) >>> ray.data.range_arrow(100).sum("value") + >>> ray.data.from_items([ + ... {"A": i, "B": i**2} + ... for i in range(100)]).sum(["A", "B"]) Args: - on: The data to sum on. - It can be the column name for Arrow dataset. + on: The data subset on which to compute the sum. + + - For a simple dataset: it can be a callable or a list thereof, + and the default is to return a scalar sum of all rows. + - For an Arrow dataset: it can be a column name or a list + thereof, and the default is to return an ``ArrowRow`` + containing the column-wise sum of all columns. Returns: The sum result. + + For a simple dataset, the output is: + + - ``on=None``: a scalar representing the sum of all rows, + - ``on=callable``: a scalar representing the sum of the outputs of + the callable called on each row, + - ``on=[callable_1, ..., calalble_n]``: a tuple of + ``(sum_1, ..., sum_n)`` representing the sum of the outputs of + the corresponding callables called on each row. + + For an Arrow dataset, the output is: + + - ``on=None``: an ArrowRow containing the column-wise sum of all + columns, + - ``on="col"``: a scalar representing the sum of all items in + column ``"col"``, + - ``on=["col_1", ..., "col_n"]``: an n-column ``ArrowRow`` + containing the column-wise sum of the provided columns. + + If the dataset is empty, then the output is 0. """ ret = self._aggregate_on(Sum, on) if ret is None: @@ -1005,16 +1039,49 @@ def sum(self, on: AggregateOnT = None) -> U: def min(self, on: AggregateOnT = None) -> U: """Compute minimum over entire dataset. + This is a blocking operation. + Examples: >>> ray.data.range(100).min() + >>> ray.data.from_items([ + ... (i, i**2) + ... for i in range(100)]).min(lambda x: x[1]) >>> ray.data.range_arrow(100).min("value") + >>> ray.data.from_items([ + ... {"A": i, "B": i**2} + ... for i in range(100)]).min(["A", "B"]) Args: - on: The data to min on. - It can be the column name for Arrow dataset. + on: The data subset on which to compute the min. + + - For a simple dataset: it can be a callable or a list thereof, + and the default is to return a scalar min of all rows. + - For an Arrow dataset: it can be a column name or a list + thereof, and the default is to return an ``ArrowRow`` + containing the column-wise min of all columns. Returns: The min result. + + For a simple dataset, the output is: + + - ``on=None``: a scalar representing the min of all rows, + - ``on=callable``: a scalar representing the min of the outputs + of the callable called on each row, + - ``on=[callable_1, ..., calalble_n]``: a tuple of + ``(min_1, ..., min_n)`` representing the min of the outputs + of the corresponding callables called on each row. + + For an Arrow dataset, the output is: + + - ``on=None``: an ``ArrowRow`` containing the column-wise min of + all columns, + - ``on="col"``: a scalar representing the min of all items in + column ``"col"``, + - ``on=["col_1", ..., "col_n"]``: an n-column ``ArrowRow`` + containing the column-wise min of the provided columns. + + If the dataset is empty, then a ``ValueError`` is raised. """ ret = self._aggregate_on(Min, on) if ret is None: @@ -1027,16 +1094,49 @@ def min(self, on: AggregateOnT = None) -> U: def max(self, on: AggregateOnT = None) -> U: """Compute maximum over entire dataset. + This is a blocking operation. + Examples: >>> ray.data.range(100).max() + >>> ray.data.from_items([ + ... (i, i**2) + ... for i in range(100)]).max(lambda x: x[1]) >>> ray.data.range_arrow(100).max("value") + >>> ray.data.from_items([ + ... {"A": i, "B": i**2} + ... for i in range(100)]).max(["A", "B"]) Args: - on: The data to max on. - It can be the column name for Arrow dataset. + on: The data subset on which to compute the max. + + - For a simple dataset: it can be a callable or a list thereof, + and the default is to return a scalar max of all rows. + - For an Arrow dataset: it can be a column name or a list + thereof, and the default is to return an ``ArrowRow`` + containing the column-wise max of all columns. Returns: The max result. + + For a simple dataset, the output is: + + - ``on=None``: a scalar representing the max of all rows, + - ``on=callable``: a scalar representing the max of the outputs of + the callable called on each row, + - ``on=[callable_1, ..., calalble_n]``: a tuple of + ``(max_1, ..., max_n)`` representing the max of the outputs of + the corresponding callables called on each row. + + For an Arrow dataset, the output is: + + - ``on=None``: an ``ArrowRow`` containing the column-wise max of + all columns, + - ``on="col"``: a scalar representing the max of all items in + column ``"col"``, + - ``on=["col_1", ..., "col_n"]``: an n-column ``ArrowRow`` + containing the column-wise max of the provided columns. + + If the dataset is empty, then a ``ValueError`` is raised. """ ret = self._aggregate_on(Max, on) if ret is None: @@ -1049,16 +1149,49 @@ def max(self, on: AggregateOnT = None) -> U: def mean(self, on: AggregateOnT = None) -> U: """Compute mean over entire dataset. + This is a blocking operation. + Examples: >>> ray.data.range(100).mean() + >>> ray.data.from_items([ + ... (i, i**2) + ... for i in range(100)]).mean(lambda x: x[1]) >>> ray.data.range_arrow(100).mean("value") + >>> ray.data.from_items([ + ... {"A": i, "B": i**2} + ... for i in range(100)]).mean(["A", "B"]) Args: - on: The data to mean on. - It can be the column name for Arrow dataset. + on: The data subset on which to compute the mean. + + - For a simple dataset: it can be a callable or a list thereof, + and the default is to return a scalar mean of all rows. + - For an Arrow dataset: it can be a column name or a list + thereof, and the default is to return an ``ArrowRow`` + containing the column-wise mean of all columns. Returns: The mean result. + + For a simple dataset, the output is: + + - ``on=None``: a scalar representing the mean of all rows, + - ``on=callable``: a scalar representing the mean of the outputs + of the callable called on each row, + - ``on=[callable_1, ..., calalble_n]``: a tuple of + ``(mean_1, ..., mean_n)`` representing the mean of the outputs + of the corresponding callables called on each row. + + For an Arrow dataset, the output is: + + - ``on=None``: an ``ArrowRow`` containing the column-wise mean of + all columns, + - ``on="col"``: a scalar representing the mean of all items in + column ``"col"``, + - ``on=["col_1", ..., "col_n"]``: an n-column ``ArrowRow`` + containing the column-wise mean of the provided columns. + + If the dataset is empty, then a ``ValueError`` is raised. """ ret = self._aggregate_on(Mean, on) if ret is None: @@ -1071,9 +1204,17 @@ def mean(self, on: AggregateOnT = None) -> U: def std(self, on: AggregateOnT = None, ddof: int = 1) -> U: """Compute standard deviation over entire dataset. + This is a blocking operation. + Examples: >>> ray.data.range(100).std() - >>> ray.data.range_arrow(100).std("value") + >>> ray.data.from_items([ + ... (i, i**2) + ... for i in range(100)]).std(lambda x: x[1]) + >>> ray.data.range_arrow(100).std("value", ddof=0) + >>> ray.data.from_items([ + ... {"A": i, "B": i**2} + ... for i in range(100)]).std(["A", "B"]) NOTE: This uses Welford's online method for an accumulator-style computation of the standard deviation. This method was chosen due to @@ -1084,13 +1225,38 @@ def std(self, on: AggregateOnT = None, ddof: int = 1) -> U: https://en.wikipedia.org/wiki/Algorithms_for_calculating_variance#Welford's_online_algorithm Args: - on: The data on which to compute the standard deviation. - It can be the column name for Arrow dataset. + on: The data subset on which to compute the std. + + - For a simple dataset: it can be a callable or a list thereof, + and the default is to return a scalar std of all rows. + - For an Arrow dataset: it can be a column name or a list + thereof, and the default is to return an ``ArrowRow`` + containing the column-wise std of all columns. ddof: Delta Degrees of Freedom. The divisor used in calculations - is N - ddof, where N represents the number of elements. + is ``N - ddof``, where ``N`` represents the number of elements. Returns: The standard deviation result. + + For a simple dataset, the output is: + + - ``on=None``: a scalar representing the std of all rows, + - ``on=callable``: a scalar representing the std of the outputs of + the callable called on each row, + - ``on=[callable_1, ..., calalble_n]``: a tuple of + ``(std_1, ..., std_n)`` representing the std of the outputs of + the corresponding callables called on each row. + + For an Arrow dataset, the output is: + + - ``on=None``: an ``ArrowRow`` containing the column-wise std of + all columns, + - ``on="col"``: a scalar representing the std of all items in + column ``"col"``, + - ``on=["col_1", ..., "col_n"]``: an n-column ``ArrowRow`` + containing the column-wise std of the provided columns. + + If the dataset is empty, then a ``ValueError`` is raised. """ ret = self._aggregate_on(Std, on, ddof=ddof) if ret is None: diff --git a/python/ray/data/grouped_dataset.py b/python/ray/data/grouped_dataset.py index f0b0f2f10e7b..9e71c2d2e891 100644 --- a/python/ray/data/grouped_dataset.py +++ b/python/ray/data/grouped_dataset.py @@ -42,7 +42,7 @@ def __init__(self, dataset: Dataset[T], key: GroupKeyT): self._key = key def aggregate(self, *aggs: AggregateFn) -> Dataset[U]: - """Implements the accumulator-based aggregation. + """Implements an accumulator-based aggregation. This is a blocking operation. @@ -58,14 +58,14 @@ def aggregate(self, *aggs: AggregateFn) -> Dataset[U]: aggs: Aggregations to do. Returns: - If the input dataset is simple dataset then the output is - a simple dataset of (k, v_1, ..., v_n) tuples where k is the - groupby key and v_i is the result of the ith given aggregation. - If the input dataset is Arrow dataset then the output is - an Arrow dataset of n + 1 columns where first column is - the groupby key and the second through n + 1 columns are the + If the input dataset is simple dataset then the output is a simple + dataset of ``(k, v_1, ..., v_n)`` tuples where ``k`` is the groupby + key and ``v_i`` is the result of the ith given aggregation. + If the input dataset is an Arrow dataset then the output is an + Arrow dataset of ``n + 1`` columns where the first column is the + groupby key and the second through ``n + 1`` columns are the results of the aggregations. - If groupby key is None then the key part of return is omitted. + If groupby key is ``None`` then the key part of return is omitted. """ if len(aggs) == 0: @@ -114,6 +114,7 @@ def aggregate(self, *aggs: AggregateFn) -> Dataset[U]: def _aggregate_on(self, agg_cls: type, on: AggregateOnT, *args, **kwargs): """Helper for aggregating on a particular subset of the dataset. + This validates the `on` argument, and converts a list of column names or lambdas to a multi-aggregation. A null `on` results in a multi-aggregation on all columns for an Arrow Dataset, and a single @@ -135,120 +136,244 @@ def count(self) -> Dataset[U]: ... "A").count() Returns: - A simple dataset of (k, v) pairs or - an Arrow dataset of [k, v] columns - where k is the groupby key and - v is the number of rows with that key. - If groupby key is None then the key part of return is omitted. + A simple dataset of ``(k, v)`` pairs or an Arrow dataset of + ``[k, v]`` columns where ``k`` is the groupby key and ``v`` is the + number of rows with that key. + If groupby key is ``None`` then the key part of return is omitted. """ return self.aggregate(Count()) def sum(self, on: AggregateOnT = None) -> Dataset[U]: - """Compute sum aggregation. + """Compute grouped sum aggregation. This is a blocking operation. Examples: >>> ray.data.range(100).groupby(lambda x: x % 3).sum() >>> ray.data.from_items([ - ... {"A": x % 3, "B": x} for x in range(100)]).groupby( - ... "A").sum("B") + ... (i % 3, i, i**2) + ... for i in range(100)]) \ + ... .groupby(lambda x: x[0] % 3) \ + ... .sum(lambda x: x[2]) + >>> ray.data.range_arrow(100).groupby("value").sum() + >>> ray.data.from_items([ + ... {"A": i % 3, "B": i, "C": i**2} + ... for i in range(100)]) \ + ... .groupby("A") \ + ... .sum(["B", "C"]) Args: - on: The data to sum on. - It can be the column name for Arrow dataset. + on: The data subset on which to compute the sum. + + - For a simple dataset: it can be a callable or a list thereof, + and the default is to take a sum of all rows. + - For an Arrow dataset: it can be a column name or a list + thereof, and the default is to do a column-wise sum of all + columns. Returns: - A simple dataset of (k, v) pairs or - an Arrow dataset of [k, v] columns - where k is the groupby key and - v is the sum result. - If groupby key is None then the key part of return is omitted. + The sum result. + + For a simple dataset, the output is: + + - ``on=None``: a simple dataset of ``(k, sum)`` tuples where ``k`` + is the groupby key and ``sum`` is sum of all rows in that group. + - ``on=[callable_1, ..., callable_n]``: a simple dataset of + ``(k, sum_1, ..., sum_n)`` tuples where ``k`` is the groupby key + and ``sum_i`` is sum of the outputs of the ith callable called on + each row in that group. + + For an Arrow dataset, the output is: + + - ``on=None``: an Arrow dataset containing a groupby key column, + ``"k"``, and a column-wise sum column for each original column + in the dataset. + - ``on=["col_1", ..., "col_n"]``: an Arrow dataset of ``n + 1`` + columns where the first column is the groupby key and the second + through ``n + 1`` columns are the results of the aggregations. + + If groupby key is ``None`` then the key part of return is omitted. """ return self._aggregate_on(Sum, on) def min(self, on: AggregateOnT = None) -> Dataset[U]: - """Compute min aggregation. + """Compute grouped min aggregation. This is a blocking operation. Examples: >>> ray.data.range(100).groupby(lambda x: x % 3).min() >>> ray.data.from_items([ - ... {"A": x % 3, "B": x} for x in range(100)]).groupby( - ... "A").min("B") + ... (i % 3, i, i**2) + ... for i in range(100)]) \ + ... .groupby(lambda x: x[0] % 3) \ + ... .min(lambda x: x[2]) + >>> ray.data.range_arrow(100).groupby("value").min() + >>> ray.data.from_items([ + ... {"A": i % 3, "B": i, "C": i**2} + ... for i in range(100)]) \ + ... .groupby("A") \ + ... .min(["B", "C"]) Args: - on: The data to min on. - It can be the column name for Arrow dataset. + on: The data subset on which to compute the min. + + - For a simple dataset: it can be a callable or a list thereof, + and the default is to take a min of all rows. + - For an Arrow dataset: it can be a column name or a list + thereof, and the default is to do a column-wise min of all + columns. Returns: - A simple dataset of (k, v) pairs or - an Arrow dataset of [k, v] columns - where k is the groupby key and - v is the min result. - If groupby key is None then the key part of return is omitted. + The min result. + + For a simple dataset, the output is: + + - ``on=None``: a simple dataset of ``(k, min)`` tuples where ``k`` + is the groupby key and min is min of all rows in that group. + - ``on=[callable_1, ..., callable_n]``: a simple dataset of + ``(k, min_1, ..., min_n)`` tuples where ``k`` is the groupby key + and ``min_i`` is min of the outputs of the ith callable called on + each row in that group. + + For an Arrow dataset, the output is: + + - ``on=None``: an Arrow dataset containing a groupby key column, + ``"k"``, and a column-wise min column for each original column in + the dataset. + - ``on=["col_1", ..., "col_n"]``: an Arrow dataset of ``n + 1`` + columns where the first column is the groupby key and the second + through ``n + 1`` columns are the results of the aggregations. + + If groupby key is ``None`` then the key part of return is omitted. """ return self._aggregate_on(Min, on) def max(self, on: AggregateOnT = None) -> Dataset[U]: - """Compute max aggregation. + """Compute grouped max aggregation. This is a blocking operation. Examples: >>> ray.data.range(100).groupby(lambda x: x % 3).max() >>> ray.data.from_items([ - ... {"A": x % 3, "B": x} for x in range(100)]).groupby( - ... "A").max("B") + ... (i % 3, i, i**2) + ... for i in range(100)]) \ + ... .groupby(lambda x: x[0] % 3) \ + ... .max(lambda x: x[2]) + >>> ray.data.range_arrow(100).groupby("value").max() + >>> ray.data.from_items([ + ... {"A": i % 3, "B": i, "C": i**2} + ... for i in range(100)]) \ + ... .groupby("A") \ + ... .max(["B", "C"]) Args: - on: The data to max on. - It can be the column name for Arrow dataset. + on: The data subset on which to compute the max. + + - For a simple dataset: it can be a callable or a list thereof, + and the default is to take a max of all rows. + - For an Arrow dataset: it can be a column name or a list + thereof, and the default is to do a column-wise max of all + columns. Returns: - A simple dataset of (k, v) pairs or - an Arrow dataset of [k, v] columns - where k is the groupby key and - v is the max result. - If groupby key is None then the key part of return is omitted. + The max result. + + For a simple dataset, the output is: + + - ``on=None``: a simple dataset of ``(k, max)`` tuples where ``k`` + is the groupby key and ``max`` is max of all rows in that group. + - ``on=[callable_1, ..., callable_n]``: a simple dataset of + ``(k, max_1, ..., max_n)`` tuples where ``k`` is the groupby key + and ``max_i`` is max of the outputs of the ith callable called on + each row in that group. + + For an Arrow dataset, the output is: + + - ``on=None``: an Arrow dataset containing a groupby key column, + ``"k"``, and a column-wise max column for each original column in + the dataset. + - ``on=["col_1", ..., "col_n"]``: an Arrow dataset of ``n + 1`` + columns where the first column is the groupby key and the second + through ``n + 1`` columns are the results of the aggregations. + + If groupby key is ``None`` then the key part of return is omitted. """ return self._aggregate_on(Max, on) def mean(self, on: AggregateOnT = None) -> Dataset[U]: - """Compute mean aggregation. + """Compute grouped mean aggregation. This is a blocking operation. Examples: >>> ray.data.range(100).groupby(lambda x: x % 3).mean() >>> ray.data.from_items([ - ... {"A": x % 3, "B": x} for x in range(100)]).groupby( - ... "A").mean("B") + ... (i % 3, i, i**2) + ... for i in range(100)]) \ + ... .groupby(lambda x: x[0] % 3) \ + ... .mean(lambda x: x[2]) + >>> ray.data.range_arrow(100).groupby("value").mean() + >>> ray.data.from_items([ + ... {"A": i % 3, "B": i, "C": i**2} + ... for i in range(100)]) \ + ... .groupby("A") \ + ... .mean(["B", "C"]) Args: - on: The data to mean on. - It can be the column name for Arrow dataset. + on: The data subset on which to compute the mean. + + - For a simple dataset: it can be a callable or a list thereof, + and the default is to take a mean of all rows. + - For an Arrow dataset: it can be a column name or a list + thereof, and the default is to do a column-wise mean of all + columns. Returns: - A simple dataset of (k, v) pairs or - an Arrow dataset of [k, v] columns - where k is the groupby key and - v is the mean result. - If groupby key is None then the key part of return is omitted. + The mean result. + + For a simple dataset, the output is: + + - ``on=None``: a simple dataset of ``(k, mean)`` tuples where ``k`` + is the groupby key and ``mean`` is mean of all rows in that + group. + - ``on=[callable_1, ..., callable_n]``: a simple dataset of + ``(k, mean_1, ..., mean_n)`` tuples where ``k`` is the groupby + key and ``mean_i`` is mean of the outputs of the ith callable + called on each row in that group. + + For an Arrow dataset, the output is: + + - ``on=None``: an Arrow dataset containing a groupby key column, + ``"k"``, and a column-wise mean column for each original column + in the dataset. + - ``on=["col_1", ..., "col_n"]``: an Arrow dataset of ``n + 1`` + columns where the first column is the groupby key and the second + through ``n + 1`` columns are the results of the aggregations. + + If groupby key is ``None`` then the key part of return is omitted. """ return self._aggregate_on(Mean, on) def std(self, on: AggregateOnT = None, ddof: int = 1) -> Dataset[U]: - """Compute standard deviation aggregation. + """Compute grouped standard deviation aggregation. This is a blocking operation. Examples: >>> ray.data.range(100).groupby(lambda x: x % 3).std() >>> ray.data.from_items([ - ... {"A": x % 3, "B": x} for x in range(100)]).groupby( - ... "A").std("B") + ... (i % 3, i, i**2) + ... for i in range(100)]) \ + ... .groupby(lambda x: x[0] % 3) \ + ... .std(lambda x: x[2]) + >>> ray.data.range_arrow(100).groupby("value").std(ddof=0) + >>> ray.data.from_items([ + ... {"A": i % 3, "B": i, "C": i**2} + ... for i in range(100)]) \ + ... .groupby("A") \ + ... .std(["B", "C"]) NOTE: This uses Welford's online method for an accumulator-style computation of the standard deviation. This method was chosen due to @@ -259,17 +384,38 @@ def std(self, on: AggregateOnT = None, ddof: int = 1) -> Dataset[U]: https://en.wikipedia.org/wiki/Algorithms_for_calculating_variance#Welford's_online_algorithm Args: - on: The data on which to compute the standard deviation. - It can be the column name for Arrow dataset. + on: The data subset on which to compute the std. + + - For a simple dataset: it can be a callable or a list thereof, + and the default is to take a std of all rows. + - For an Arrow dataset: it can be a column name or a list + thereof, and the default is to do a column-wise std of all + columns. ddof: Delta Degrees of Freedom. The divisor used in calculations - is N - ddof, where N represents the number of elements. + is ``N - ddof``, where ``N`` represents the number of elements. Returns: - A simple dataset of (k, v) pairs or - an Arrow dataset of [k, v] columns - where k is the groupby key and - v is the standard deviation result. - If groupby key is None then the key part of return is omitted. + The standard deviation result. + + For a simple dataset, the output is: + + - ``on=None``: a simple dataset of ``(k, std)`` tuples where ``k`` + is the groupby key and ``std`` is std of all rows in that group. + - ``on=[callable_1, ..., callable_n]``: a simple dataset of + ``(k, std_1, ..., std_n)`` tuples where ``k`` is the groupby key + and ``std_i`` is std of the outputs of the ith callable called on + each row in that group. + + For an Arrow dataset, the output is: + + - ``on=None``: an Arrow dataset containing a groupby key column, + ``"k"``, and a column-wise std column for each original column in + the dataset. + - ``on=["col_1", ..., "col_n"]``: an Arrow dataset of ``n + 1`` + columns where the first column is the groupby key and the second + through ``n + 1`` columns are the results of the aggregations. + + If groupby key is ``None`` then the key part of return is omitted. """ return self._aggregate_on(Std, on, ddof=ddof)