diff --git a/.github/CODEOWNERS b/.github/CODEOWNERS index 8507f66c0cb3..aadab87bfd8e 100644 --- a/.github/CODEOWNERS +++ b/.github/CODEOWNERS @@ -51,8 +51,8 @@ /python/ray/tune/ @ray-project/ray-tune # Ray data. -/python/ray/data/ @ericl @scv119 @clarkzinzow -/doc/source/data/ @ericl @scv119 @clarkzinzow +/python/ray/data/ @ericl @scv119 @clarkzinzow @jjyao +/doc/source/data/ @ericl @scv119 @clarkzinzow @jjyao # Ray workflows. /python/ray/workflow/ @ericl @iycheng diff --git a/python/ray/data/dataset.py b/python/ray/data/dataset.py index 255b5a373def..c81fd7a99854 100644 --- a/python/ray/data/dataset.py +++ b/python/ray/data/dataset.py @@ -124,6 +124,7 @@ def __init__( self._lazy = lazy if not lazy: + # TODO(ekl) we should clear inputs once we have full lineage recorded. self._plan.execute(clear_input_blocks=False) def map( diff --git a/python/ray/data/tests/test_optimize.py b/python/ray/data/tests/test_optimize.py index 34e2190c8d20..bb2f353335b2 100644 --- a/python/ray/data/tests/test_optimize.py +++ b/python/ray/data/tests/test_optimize.py @@ -1,10 +1,12 @@ import pytest +import numpy as np import pandas as pd import os import ray from ray.data.context import DatasetContext from ray.data.datasource.csv_datasource import CSVDatasource +from ray.internal.internal_api import memory_summary from ray.tests.conftest import * # noqa @@ -17,6 +19,70 @@ def expect_stages(pipe, num_stages_expected, stage_names): assert len(pipe._optimized_stages) == num_stages_expected, pipe._optimized_stages +def test_memory_sanity(shutdown_only): + info = ray.init(num_cpus=1, object_store_memory=500e6) + ds = ray.data.range(10) + ds = ds.map(lambda x: np.ones(100 * 1024 * 1024, dtype=np.uint8)) + meminfo = memory_summary(info.address_info["address"], stats_only=True) + + # Sanity check spilling is happening as expected. + assert "Spilled" in meminfo, meminfo + + +def test_memory_release_eager(shutdown_only): + info = ray.init(num_cpus=1, object_store_memory=1500e6) + ds = ray.data.range(10) + + # Round 1. + ds = ds.map(lambda x: np.ones(100 * 1024 * 1024, dtype=np.uint8)) + meminfo = memory_summary(info.address_info["address"], stats_only=True) + assert "Spilled" not in meminfo, meminfo + + # Round 2. + ds = ds.map(lambda x: np.ones(100 * 1024 * 1024, dtype=np.uint8)) + meminfo = memory_summary(info["address"], stats_only=True) + # TODO(ekl) we can add this back once we clear inputs on eager exec as well. + # assert "Spilled" not in meminfo, meminfo + + +def test_memory_release_lazy(shutdown_only): + info = ray.init(num_cpus=1, object_store_memory=1500e6) + ds = ray.data.range(10) + + # Should get fused into single stage. + ds = ds._experimental_lazy() + ds = ds.map(lambda x: np.ones(100 * 1024 * 1024, dtype=np.uint8)) + ds = ds.map(lambda x: np.ones(100 * 1024 * 1024, dtype=np.uint8)) + ds = ds.map(lambda x: np.ones(100 * 1024 * 1024, dtype=np.uint8)) + ds.fully_executed() + meminfo = memory_summary(info.address_info["address"], stats_only=True) + assert "Spilled" not in meminfo, meminfo + + +def test_memory_release_lazy_shuffle(shutdown_only): + # TODO(ekl) why is this flaky? Due to eviction delay? + error = None + for trial in range(3): + print("Try", trial) + try: + info = ray.init(num_cpus=1, object_store_memory=1800e6) + ds = ray.data.range(10) + + # Should get fused into single stage. + ds = ds._experimental_lazy() + ds = ds.map(lambda x: np.ones(100 * 1024 * 1024, dtype=np.uint8)) + ds.random_shuffle().fully_executed() + meminfo = memory_summary(info.address_info["address"], stats_only=True) + assert "Spilled" not in meminfo, meminfo + return + except Exception as e: + error = e + print("Failed", e) + finally: + ray.shutdown() + raise error + + def test_spread_hint_inherit(ray_start_regular_shared): ds = ray.data.range(10)._experimental_lazy() ds = ds.map(lambda x: x + 1)