Skip to content

Commit

Permalink
Add some sanity checks for memory use in dataset (#22642)
Browse files Browse the repository at this point in the history
  • Loading branch information
ericl authored Feb 26, 2022
1 parent 4bf587f commit ae16aa1
Show file tree
Hide file tree
Showing 3 changed files with 69 additions and 2 deletions.
4 changes: 2 additions & 2 deletions .github/CODEOWNERS
Validating CODEOWNERS rules …
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,8 @@
/python/ray/tune/ @ray-project/ray-tune

# Ray data.
/python/ray/data/ @ericl @scv119 @clarkzinzow
/doc/source/data/ @ericl @scv119 @clarkzinzow
/python/ray/data/ @ericl @scv119 @clarkzinzow @jjyao
/doc/source/data/ @ericl @scv119 @clarkzinzow @jjyao

# Ray workflows.
/python/ray/workflow/ @ericl @iycheng
Expand Down
1 change: 1 addition & 0 deletions python/ray/data/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,7 @@ def __init__(
self._lazy = lazy

if not lazy:
# TODO(ekl) we should clear inputs once we have full lineage recorded.
self._plan.execute(clear_input_blocks=False)

def map(
Expand Down
66 changes: 66 additions & 0 deletions python/ray/data/tests/test_optimize.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
import pytest
import numpy as np
import pandas as pd
import os

import ray
from ray.data.context import DatasetContext
from ray.data.datasource.csv_datasource import CSVDatasource
from ray.internal.internal_api import memory_summary

from ray.tests.conftest import * # noqa

Expand All @@ -17,6 +19,70 @@ def expect_stages(pipe, num_stages_expected, stage_names):
assert len(pipe._optimized_stages) == num_stages_expected, pipe._optimized_stages


def test_memory_sanity(shutdown_only):
info = ray.init(num_cpus=1, object_store_memory=500e6)
ds = ray.data.range(10)
ds = ds.map(lambda x: np.ones(100 * 1024 * 1024, dtype=np.uint8))
meminfo = memory_summary(info.address_info["address"], stats_only=True)

# Sanity check spilling is happening as expected.
assert "Spilled" in meminfo, meminfo


def test_memory_release_eager(shutdown_only):
info = ray.init(num_cpus=1, object_store_memory=1500e6)
ds = ray.data.range(10)

# Round 1.
ds = ds.map(lambda x: np.ones(100 * 1024 * 1024, dtype=np.uint8))
meminfo = memory_summary(info.address_info["address"], stats_only=True)
assert "Spilled" not in meminfo, meminfo

# Round 2.
ds = ds.map(lambda x: np.ones(100 * 1024 * 1024, dtype=np.uint8))
meminfo = memory_summary(info["address"], stats_only=True)
# TODO(ekl) we can add this back once we clear inputs on eager exec as well.
# assert "Spilled" not in meminfo, meminfo


def test_memory_release_lazy(shutdown_only):
info = ray.init(num_cpus=1, object_store_memory=1500e6)
ds = ray.data.range(10)

# Should get fused into single stage.
ds = ds._experimental_lazy()
ds = ds.map(lambda x: np.ones(100 * 1024 * 1024, dtype=np.uint8))
ds = ds.map(lambda x: np.ones(100 * 1024 * 1024, dtype=np.uint8))
ds = ds.map(lambda x: np.ones(100 * 1024 * 1024, dtype=np.uint8))
ds.fully_executed()
meminfo = memory_summary(info.address_info["address"], stats_only=True)
assert "Spilled" not in meminfo, meminfo


def test_memory_release_lazy_shuffle(shutdown_only):
# TODO(ekl) why is this flaky? Due to eviction delay?
error = None
for trial in range(3):
print("Try", trial)
try:
info = ray.init(num_cpus=1, object_store_memory=1800e6)
ds = ray.data.range(10)

# Should get fused into single stage.
ds = ds._experimental_lazy()
ds = ds.map(lambda x: np.ones(100 * 1024 * 1024, dtype=np.uint8))
ds.random_shuffle().fully_executed()
meminfo = memory_summary(info.address_info["address"], stats_only=True)
assert "Spilled" not in meminfo, meminfo
return
except Exception as e:
error = e
print("Failed", e)
finally:
ray.shutdown()
raise error


def test_spread_hint_inherit(ray_start_regular_shared):
ds = ray.data.range(10)._experimental_lazy()
ds = ds.map(lambda x: x + 1)
Expand Down

0 comments on commit ae16aa1

Please sign in to comment.