Skip to content

Commit

Permalink
[Datasets] Change range_arrow() API to range_table() (ray-project…
Browse files Browse the repository at this point in the history
…#24704)

This PR changes the ray.data.range_arrow() to ray.data.range_table(), making the Arrow representation an implementation detail.
  • Loading branch information
clarkzinzow committed May 20, 2022
1 parent fcc039c commit d589329
Show file tree
Hide file tree
Showing 16 changed files with 85 additions and 64 deletions.
2 changes: 1 addition & 1 deletion doc/source/data/getting-started.rst
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ Transformations are executed *eagerly* and block until the operation is finished
def transform_batch(df: pandas.DataFrame) -> pandas.DataFrame:
return df.applymap(lambda x: x * 2)
ds = ray.data.range_arrow(10000)
ds = ray.data.range_table(10000)
ds = ds.map_batches(transform_batch, batch_format="pandas")
# -> Map Progress: 100%|████████████████████| 200/200 [00:00<00:00, 1927.62it/s]
ds.take(5)
Expand Down
2 changes: 1 addition & 1 deletion doc/source/data/package-ref.rst
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ Creating Datasets
-----------------

.. autofunction:: ray.data.range
.. autofunction:: ray.data.range_arrow
.. autofunction:: ray.data.range_table
.. autofunction:: ray.data.range_tensor
.. autofunction:: ray.data.read_csv
.. autofunction:: ray.data.read_json
Expand Down
2 changes: 1 addition & 1 deletion doc/source/data/random-access.rst
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ Any Arrow-format dataset can be enabled for random access by calling ``dataset.t
.. code-block:: python
# Generate a dummy embedding table as an example.
ds = ray.data.range_arrow(100)
ds = ray.data.range_table(100)
ds = ds.add_column("embedding", lambda b: b["value"] ** 2)
# -> schema={value: int64, embedding: int64}
Expand Down
2 changes: 1 addition & 1 deletion doc/source/data/transforming-datasets.rst
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ Transformations are executed *eagerly* and block until the operation is finished
def transform_batch(df: pandas.DataFrame) -> pandas.DataFrame:
return df.applymap(lambda x: x * 2)
ds = ray.data.range_arrow(10000)
ds = ray.data.range_table(10000)
ds = ds.map_batches(transform_batch, batch_format="pandas")
# -> Map Progress: 100%|████████████████████| 200/200 [00:00<00:00, 1927.62it/s]
ds.take(5)
Expand Down
5 changes: 3 additions & 2 deletions python/ray/data/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from ray.data.read_api import (
from ray.data.read_api import ( # noqa: F401
from_items,
range,
range_table,
range_arrow,
range_tensor,
read_parquet,
Expand Down Expand Up @@ -53,7 +54,7 @@
"from_spark",
"from_huggingface",
"range",
"range_arrow",
"range_table",
"range_tensor",
"read_text",
"read_binary_files",
Expand Down
14 changes: 7 additions & 7 deletions python/ray/data/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -349,7 +349,7 @@ def add_column(
Examples:
>>> import ray
>>> ds = ray.data.range_arrow(100) # doctest: +SKIP
>>> ds = ray.data.range_table(100) # doctest: +SKIP
>>> # Add a new column equal to value * 2.
>>> ds = ds.add_column( # doctest: +SKIP
... "new_col", lambda df: df["value"] * 2)
Expand Down Expand Up @@ -1108,7 +1108,7 @@ def aggregate(self, *aggs: AggregateFn) -> U:
>>> import ray
>>> from ray.data.aggregate import Max, Mean
>>> ray.data.range(100).aggregate(Max()) # doctest: +SKIP
>>> ray.data.range_arrow(100).aggregate( # doctest: +SKIP
>>> ray.data.range_table(100).aggregate( # doctest: +SKIP
... Max("value"), Mean("value")) # doctest: +SKIP
Time complexity: O(dataset size / parallelism)
Expand Down Expand Up @@ -1141,7 +1141,7 @@ def sum(
>>> ray.data.from_items([ # doctest: +SKIP
... (i, i**2) # doctest: +SKIP
... for i in range(100)]).sum(lambda x: x[1]) # doctest: +SKIP
>>> ray.data.range_arrow(100).sum("value") # doctest: +SKIP
>>> ray.data.range_table(100).sum("value") # doctest: +SKIP
>>> ray.data.from_items([ # doctest: +SKIP
... {"A": i, "B": i**2} # doctest: +SKIP
... for i in range(100)]).sum(["A", "B"]) # doctest: +SKIP
Expand Down Expand Up @@ -1200,7 +1200,7 @@ def min(
>>> ray.data.from_items([ # doctest: +SKIP
... (i, i**2) # doctest: +SKIP
... for i in range(100)]).min(lambda x: x[1]) # doctest: +SKIP
>>> ray.data.range_arrow(100).min("value") # doctest: +SKIP
>>> ray.data.range_table(100).min("value") # doctest: +SKIP
>>> ray.data.from_items([ # doctest: +SKIP
... {"A": i, "B": i**2} # doctest: +SKIP
... for i in range(100)]).min(["A", "B"]) # doctest: +SKIP
Expand Down Expand Up @@ -1259,7 +1259,7 @@ def max(
>>> ray.data.from_items([ # doctest: +SKIP
... (i, i**2) # doctest: +SKIP
... for i in range(100)]).max(lambda x: x[1]) # doctest: +SKIP
>>> ray.data.range_arrow(100).max("value") # doctest: +SKIP
>>> ray.data.range_table(100).max("value") # doctest: +SKIP
>>> ray.data.from_items([ # doctest: +SKIP
... {"A": i, "B": i**2} # doctest: +SKIP
... for i in range(100)]).max(["A", "B"]) # doctest: +SKIP
Expand Down Expand Up @@ -1318,7 +1318,7 @@ def mean(
>>> ray.data.from_items([ # doctest: +SKIP
... (i, i**2) # doctest: +SKIP
... for i in range(100)]).mean(lambda x: x[1]) # doctest: +SKIP
>>> ray.data.range_arrow(100).mean("value") # doctest: +SKIP
>>> ray.data.range_table(100).mean("value") # doctest: +SKIP
>>> ray.data.from_items([ # doctest: +SKIP
... {"A": i, "B": i**2} # doctest: +SKIP
... for i in range(100)]).mean(["A", "B"]) # doctest: +SKIP
Expand Down Expand Up @@ -1380,7 +1380,7 @@ def std(
>>> ray.data.from_items([ # doctest: +SKIP
... (i, i**2) # doctest: +SKIP
... for i in range(100)]).std(lambda x: x[1]) # doctest: +SKIP
>>> ray.data.range_arrow(100).std("value", ddof=0) # doctest: +SKIP
>>> ray.data.range_table(100).std("value", ddof=0) # doctest: +SKIP
>>> ray.data.from_items([ # doctest: +SKIP
... {"A": i, "B": i**2} # doctest: +SKIP
... for i in range(100)]).std(["A", "B"]) # doctest: +SKIP
Expand Down
16 changes: 10 additions & 6 deletions python/ray/data/datasource/datasource.py
Original file line number Diff line number Diff line change
Expand Up @@ -184,18 +184,22 @@ def prepare_read(
# from an external system instead of generating dummy data.
def make_block(start: int, count: int) -> Block:
if block_format == "arrow":
return pyarrow.Table.from_arrays(
import pyarrow as pa

return pa.Table.from_arrays(
[np.arange(start, start + count)], names=["value"]
)
elif block_format == "tensor":
import pyarrow as pa

tensor = TensorArray(
np.ones(tensor_shape, dtype=np.int64)
* np.expand_dims(
np.arange(start, start + count),
tuple(range(1, 1 + len(tensor_shape))),
)
)
return pyarrow.Table.from_pydict({"value": tensor})
return pa.Table.from_pydict({"value": tensor})
else:
return list(builtins.range(start, start + count))

Expand All @@ -204,21 +208,21 @@ def make_block(start: int, count: int) -> Block:
count = min(block_size, n - i)
if block_format == "arrow":
_check_pyarrow_version()
import pyarrow
import pyarrow as pa

schema = pyarrow.Table.from_pydict({"value": [0]}).schema
schema = pa.Table.from_pydict({"value": [0]}).schema
elif block_format == "tensor":
_check_pyarrow_version()
from ray.data.extensions import TensorArray
import pyarrow
import pyarrow as pa

tensor = TensorArray(
np.ones(tensor_shape, dtype=np.int64)
* np.expand_dims(
np.arange(0, 10), tuple(range(1, 1 + len(tensor_shape)))
)
)
schema = pyarrow.Table.from_pydict({"value": tensor}).schema
schema = pa.Table.from_pydict({"value": tensor}).schema
elif block_format == "list":
schema = int
else:
Expand Down
10 changes: 5 additions & 5 deletions python/ray/data/grouped_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -308,7 +308,7 @@ def sum(
... for i in range(100)]) \ # doctest: +SKIP
... .groupby(lambda x: x[0] % 3) \ # doctest: +SKIP
... .sum(lambda x: x[2]) # doctest: +SKIP
>>> ray.data.range_arrow(100).groupby("value").sum() # doctest: +SKIP
>>> ray.data.range_table(100).groupby("value").sum() # doctest: +SKIP
>>> ray.data.from_items([ # doctest: +SKIP
... {"A": i % 3, "B": i, "C": i**2} # doctest: +SKIP
... for i in range(100)]) \ # doctest: +SKIP
Expand Down Expand Up @@ -369,7 +369,7 @@ def min(
... for i in range(100)]) \ # doctest: +SKIP
... .groupby(lambda x: x[0] % 3) \ # doctest: +SKIP
... .min(lambda x: x[2]) # doctest: +SKIP
>>> ray.data.range_arrow(100).groupby("value").min() # doctest: +SKIP
>>> ray.data.range_table(100).groupby("value").min() # doctest: +SKIP
>>> ray.data.from_items([ # doctest: +SKIP
... {"A": i % 3, "B": i, "C": i**2} # doctest: +SKIP
... for i in range(100)]) \ # doctest: +SKIP
Expand Down Expand Up @@ -430,7 +430,7 @@ def max(
... for i in range(100)]) \ # doctest: +SKIP
... .groupby(lambda x: x[0] % 3) \ # doctest: +SKIP
... .max(lambda x: x[2]) # doctest: +SKIP
>>> ray.data.range_arrow(100).groupby("value").max() # doctest: +SKIP
>>> ray.data.range_table(100).groupby("value").max() # doctest: +SKIP
>>> ray.data.from_items([ # doctest: +SKIP
... {"A": i % 3, "B": i, "C": i**2} # doctest: +SKIP
... for i in range(100)]) \ # doctest: +SKIP
Expand Down Expand Up @@ -491,7 +491,7 @@ def mean(
... for i in range(100)]) \ # doctest: +SKIP
... .groupby(lambda x: x[0] % 3) \ # doctest: +SKIP
... .mean(lambda x: x[2]) # doctest: +SKIP
>>> ray.data.range_arrow(100).groupby("value").mean() # doctest: +SKIP
>>> ray.data.range_table(100).groupby("value").mean() # doctest: +SKIP
>>> ray.data.from_items([ # doctest: +SKIP
... {"A": i % 3, "B": i, "C": i**2} # doctest: +SKIP
... for i in range(100)]) \ # doctest: +SKIP
Expand Down Expand Up @@ -556,7 +556,7 @@ def std(
... for i in range(100)]) \ # doctest: +SKIP
... .groupby(lambda x: x[0] % 3) \ # doctest: +SKIP
... .std(lambda x: x[2]) # doctest: +SKIP
>>> ray.data.range_arrow(100).groupby("value").std(ddof=0) # doctest: +SKIP
>>> ray.data.range_table(100).groupby("value").std(ddof=0) # doctest: +SKIP
>>> ray.data.from_items([ # doctest: +SKIP
... {"A": i % 3, "B": i, "C": i**2} # doctest: +SKIP
... for i in range(100)]) \ # doctest: +SKIP
Expand Down
12 changes: 8 additions & 4 deletions python/ray/data/read_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -135,12 +135,12 @@ def range(n: int, *, parallelism: int = 200) -> Dataset[int]:


@PublicAPI
def range_arrow(n: int, *, parallelism: int = 200) -> Dataset[ArrowRow]:
"""Create an Arrow dataset from a range of integers [0..n).
def range_table(n: int, *, parallelism: int = 200) -> Dataset[ArrowRow]:
"""Create a tabular dataset from a range of integers [0..n).
Examples:
>>> import ray
>>> ds = ray.data.range_arrow(1000) # doctest: +SKIP
>>> ds = ray.data.range_table(1000) # doctest: +SKIP
>>> ds.map(lambda r: {"v2": r["value"] * 2}).show() # doctest: +SKIP
This is similar to range(), but uses Arrow tables to hold the integers
Expand All @@ -159,6 +159,10 @@ def range_arrow(n: int, *, parallelism: int = 200) -> Dataset[ArrowRow]:
)


def range_arrow(*args, **kwargs):
raise DeprecationWarning("range_arrow() is deprecated, use range_table() instead.")


@PublicAPI
def range_tensor(
n: int, *, shape: Tuple = (1,), parallelism: int = 200
Expand All @@ -171,7 +175,7 @@ def range_tensor(
>>> ds.map_batches( # doctest: +SKIP
... lambda arr: arr * 2, batch_format="pandas").show()
This is similar to range_arrow(), but uses the ArrowTensorArray extension
This is similar to range_table(), but uses the ArrowTensorArray extension
type. The dataset elements take the form {"value": array(N, shape=shape)}.
Args:
Expand Down
Loading

0 comments on commit d589329

Please sign in to comment.