Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add some sanity checks for memory use in dataset #22642

Merged
merged 4 commits into from
Feb 26, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .github/CODEOWNERS
Validating CODEOWNERS rules …
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,8 @@
/python/ray/tune/ @ray-project/ray-tune

# Ray data.
/python/ray/data/ @ericl @scv119 @clarkzinzow
/doc/source/data/ @ericl @scv119 @clarkzinzow
/python/ray/data/ @ericl @scv119 @clarkzinzow @jjyao
/doc/source/data/ @ericl @scv119 @clarkzinzow @jjyao

# Ray workflows.
/python/ray/workflow/ @ericl @iycheng
Expand Down
1 change: 1 addition & 0 deletions python/ray/data/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,7 @@ def __init__(
self._lazy = lazy

if not lazy:
# TODO(ekl) we should clear inputs once we have full lineage recorded.
self._plan.execute(clear_input_blocks=False)

def map(
Expand Down
66 changes: 66 additions & 0 deletions python/ray/data/tests/test_optimize.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
import pytest
import numpy as np
import pandas as pd
import os

import ray
from ray.data.context import DatasetContext
from ray.data.datasource.csv_datasource import CSVDatasource
from ray.internal.internal_api import memory_summary

from ray.tests.conftest import * # noqa

Expand All @@ -17,6 +19,70 @@ def expect_stages(pipe, num_stages_expected, stage_names):
assert len(pipe._optimized_stages) == num_stages_expected, pipe._optimized_stages


def test_memory_sanity(shutdown_only):
info = ray.init(num_cpus=1, object_store_memory=500e6)
ds = ray.data.range(10)
ds = ds.map(lambda x: np.ones(100 * 1024 * 1024, dtype=np.uint8))
meminfo = memory_summary(info.address_info["address"], stats_only=True)

# Sanity check spilling is happening as expected.
assert "Spilled" in meminfo, meminfo


def test_memory_release_eager(shutdown_only):
info = ray.init(num_cpus=1, object_store_memory=1500e6)
ds = ray.data.range(10)

# Round 1.
ds = ds.map(lambda x: np.ones(100 * 1024 * 1024, dtype=np.uint8))
meminfo = memory_summary(info.address_info["address"], stats_only=True)
assert "Spilled" not in meminfo, meminfo

# Round 2.
ds = ds.map(lambda x: np.ones(100 * 1024 * 1024, dtype=np.uint8))
meminfo = memory_summary(info["address"], stats_only=True)
# TODO(ekl) we can add this back once we clear inputs on eager exec as well.
# assert "Spilled" not in meminfo, meminfo


def test_memory_release_lazy(shutdown_only):
info = ray.init(num_cpus=1, object_store_memory=1500e6)
ds = ray.data.range(10)

# Should get fused into single stage.
ds = ds._experimental_lazy()
ds = ds.map(lambda x: np.ones(100 * 1024 * 1024, dtype=np.uint8))
ds = ds.map(lambda x: np.ones(100 * 1024 * 1024, dtype=np.uint8))
ds = ds.map(lambda x: np.ones(100 * 1024 * 1024, dtype=np.uint8))
ds.fully_executed()
meminfo = memory_summary(info.address_info["address"], stats_only=True)
assert "Spilled" not in meminfo, meminfo


def test_memory_release_lazy_shuffle(shutdown_only):
# TODO(ekl) why is this flaky? Due to eviction delay?
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FYI @stephanie-wang , not sure if you also observed this in your shuffle experiments (memory sometimes not getting released in a timely manner during shuffle)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm I think @franklsf95 actually did see something like this.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's quite odd. I don't think it's related to Python GC or anything. I figured it might be a unit-test artifact since we have some batching/delay before releasing memory, which would only show up in a fast test, but it could also be an issue at larger scale.

error = None
for trial in range(3):
print("Try", trial)
try:
info = ray.init(num_cpus=1, object_store_memory=1800e6)
ds = ray.data.range(10)

# Should get fused into single stage.
ds = ds._experimental_lazy()
ds = ds.map(lambda x: np.ones(100 * 1024 * 1024, dtype=np.uint8))
ds.random_shuffle().fully_executed()
meminfo = memory_summary(info.address_info["address"], stats_only=True)
assert "Spilled" not in meminfo, meminfo
return
except Exception as e:
error = e
print("Failed", e)
finally:
ray.shutdown()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will this cause double shutdown since you also have shutdown_only?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea; though it is harmless.

raise error


def test_spread_hint_inherit(ray_start_regular_shared):
ds = ray.data.range(10)._experimental_lazy()
ds = ds.map(lambda x: x + 1)
Expand Down