From ae16aa1dbabfa448d64a6070260358bc2b0aeec6 Mon Sep 17 00:00:00 2001 From: Eric Liang Date: Fri, 25 Feb 2022 16:59:12 -0800 Subject: [PATCH] Add some sanity checks for memory use in dataset (#22642) --- .github/CODEOWNERS | 4 +- python/ray/data/dataset.py | 1 + python/ray/data/tests/test_optimize.py | 66 ++++++++++++++++++++++++++ 3 files changed, 69 insertions(+), 2 deletions(-) diff --git a/.github/CODEOWNERS b/.github/CODEOWNERS index 4161adbd0921..271e5aeccf25 100644 --- a/.github/CODEOWNERS +++ b/.github/CODEOWNERS @@ -51,8 +51,8 @@ /python/ray/tune/ @ray-project/ray-tune # Ray data. -/python/ray/data/ @ericl @scv119 @clarkzinzow -/doc/source/data/ @ericl @scv119 @clarkzinzow +/python/ray/data/ @ericl @scv119 @clarkzinzow @jjyao +/doc/source/data/ @ericl @scv119 @clarkzinzow @jjyao # Ray workflows. /python/ray/workflow/ @ericl @iycheng diff --git a/python/ray/data/dataset.py b/python/ray/data/dataset.py index 255b5a373def..c81fd7a99854 100644 --- a/python/ray/data/dataset.py +++ b/python/ray/data/dataset.py @@ -124,6 +124,7 @@ def __init__( self._lazy = lazy if not lazy: + # TODO(ekl) we should clear inputs once we have full lineage recorded. self._plan.execute(clear_input_blocks=False) def map( diff --git a/python/ray/data/tests/test_optimize.py b/python/ray/data/tests/test_optimize.py index 34e2190c8d20..bb2f353335b2 100644 --- a/python/ray/data/tests/test_optimize.py +++ b/python/ray/data/tests/test_optimize.py @@ -1,10 +1,12 @@ import pytest +import numpy as np import pandas as pd import os import ray from ray.data.context import DatasetContext from ray.data.datasource.csv_datasource import CSVDatasource +from ray.internal.internal_api import memory_summary from ray.tests.conftest import * # noqa @@ -17,6 +19,70 @@ def expect_stages(pipe, num_stages_expected, stage_names): assert len(pipe._optimized_stages) == num_stages_expected, pipe._optimized_stages +def test_memory_sanity(shutdown_only): + info = ray.init(num_cpus=1, object_store_memory=500e6) + ds = ray.data.range(10) + ds = ds.map(lambda x: np.ones(100 * 1024 * 1024, dtype=np.uint8)) + meminfo = memory_summary(info.address_info["address"], stats_only=True) + + # Sanity check spilling is happening as expected. + assert "Spilled" in meminfo, meminfo + + +def test_memory_release_eager(shutdown_only): + info = ray.init(num_cpus=1, object_store_memory=1500e6) + ds = ray.data.range(10) + + # Round 1. + ds = ds.map(lambda x: np.ones(100 * 1024 * 1024, dtype=np.uint8)) + meminfo = memory_summary(info.address_info["address"], stats_only=True) + assert "Spilled" not in meminfo, meminfo + + # Round 2. + ds = ds.map(lambda x: np.ones(100 * 1024 * 1024, dtype=np.uint8)) + meminfo = memory_summary(info["address"], stats_only=True) + # TODO(ekl) we can add this back once we clear inputs on eager exec as well. + # assert "Spilled" not in meminfo, meminfo + + +def test_memory_release_lazy(shutdown_only): + info = ray.init(num_cpus=1, object_store_memory=1500e6) + ds = ray.data.range(10) + + # Should get fused into single stage. + ds = ds._experimental_lazy() + ds = ds.map(lambda x: np.ones(100 * 1024 * 1024, dtype=np.uint8)) + ds = ds.map(lambda x: np.ones(100 * 1024 * 1024, dtype=np.uint8)) + ds = ds.map(lambda x: np.ones(100 * 1024 * 1024, dtype=np.uint8)) + ds.fully_executed() + meminfo = memory_summary(info.address_info["address"], stats_only=True) + assert "Spilled" not in meminfo, meminfo + + +def test_memory_release_lazy_shuffle(shutdown_only): + # TODO(ekl) why is this flaky? Due to eviction delay? + error = None + for trial in range(3): + print("Try", trial) + try: + info = ray.init(num_cpus=1, object_store_memory=1800e6) + ds = ray.data.range(10) + + # Should get fused into single stage. + ds = ds._experimental_lazy() + ds = ds.map(lambda x: np.ones(100 * 1024 * 1024, dtype=np.uint8)) + ds.random_shuffle().fully_executed() + meminfo = memory_summary(info.address_info["address"], stats_only=True) + assert "Spilled" not in meminfo, meminfo + return + except Exception as e: + error = e + print("Failed", e) + finally: + ray.shutdown() + raise error + + def test_spread_hint_inherit(ray_start_regular_shared): ds = ray.data.range(10)._experimental_lazy() ds = ds.map(lambda x: x + 1)