Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Core] Out of Disk prevention #25370

Merged
merged 51 commits into from
Jun 22, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
51 commits
Select commit Hold shift + click to select a range
86ab5c7
add
scv119 Jun 1, 2022
0827fe6
add
scv119 Jun 1, 2022
2869ae1
add
scv119 Jun 1, 2022
85eec40
add
scv119 Jun 1, 2022
a9449bc
add
scv119 Jun 2, 2022
7c89d20
add
scv119 Jun 2, 2022
af82bcd
add
scv119 Jun 2, 2022
11fb149
ad
scv119 Jun 2, 2022
b771d84
add
scv119 Jun 2, 2022
328952e
add
scv119 Jun 2, 2022
ae001aa
update
scv119 Jun 2, 2022
f3ff35a
fix-tests
scv119 Jun 2, 2022
9efd766
add tests
scv119 Jun 3, 2022
cd6b0dd
add more tests
scv119 Jun 3, 2022
ad03dba
add
scv119 Jun 3, 2022
f47b977
add
scv119 Jun 3, 2022
35332e2
add
scv119 Jun 3, 2022
f14b8ab
add
scv119 Jun 3, 2022
16bcadb
update
scv119 Jun 3, 2022
8eae0c1
add
scv119 Jun 5, 2022
e44f519
add
scv119 Jun 5, 2022
7fcc50c
update
scv119 Jun 5, 2022
9068b57
add
scv119 Jun 5, 2022
79073d9
support spilled path
scv119 Jun 5, 2022
f53ff89
add
scv119 Jun 5, 2022
08a47c2
fix-build
scv119 Jun 6, 2022
08eb12a
linter
scv119 Jun 6, 2022
911fe2e
update
scv119 Jun 6, 2022
cf98607
fix-ci
scv119 Jun 6, 2022
b526d79
update
scv119 Jun 7, 2022
6586d40
add
scv119 Jun 7, 2022
fba24e3
address comments
scv119 Jun 9, 2022
4dc64db
add
scv119 Jun 9, 2022
ffd987c
working on tests
scv119 Jun 9, 2022
613a95c
fix test
scv119 Jun 13, 2022
cb89f97
update
scv119 Jun 13, 2022
b0fdc44
Merge remote-tracking branch 'upstream/master' into ood-take-1
scv119 Jun 17, 2022
9990a34
address comments
scv119 Jun 17, 2022
f44ffd1
add
scv119 Jun 17, 2022
9a54a66
add
scv119 Jun 20, 2022
cc3b06c
add
scv119 Jun 20, 2022
fc0dbc1
add test
scv119 Jun 20, 2022
af15ae9
add
scv119 Jun 20, 2022
127fafc
add
scv119 Jun 20, 2022
f9fde2c
add
scv119 Jun 20, 2022
baf19eb
Merge branch 'master' into ood-take-1
scv119 Jun 20, 2022
9cff82b
fix-windows
scv119 Jun 20, 2022
f3bc923
add
scv119 Jun 20, 2022
dc333af
fix ci
scv119 Jun 21, 2022
cd55ffb
Merge remote-tracking branch 'upstream/master' into ood-take-1
scv119 Jun 21, 2022
cb32c24
fix merge failure
scv119 Jun 21, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -1431,6 +1431,7 @@ cc_test(
copts = COPTS,
tags = ["team:core"],
deps = [
":ray_common",
":ray_util",
"@com_google_googletest//:gtest_main",
],
Expand Down
5 changes: 5 additions & 0 deletions python/ray/_private/serialization.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
ObjectReconstructionFailedError,
ObjectReconstructionFailedLineageEvictedError,
ObjectReconstructionFailedMaxAttemptsExceededError,
OutOfDiskError,
OwnerDiedError,
PlasmaObjectNotAvailable,
RayActorError,
Expand Down Expand Up @@ -276,6 +277,10 @@ def _deserialize_object(self, data, metadata, object_ref):
return ObjectFetchTimedOutError(
object_ref.hex(), object_ref.owner_address(), object_ref.call_site()
)
elif error_type == ErrorType.Value("OUT_OF_DISK_ERROR"):
return OutOfDiskError(
object_ref.hex(), object_ref.owner_address(), object_ref.call_site()
)
elif error_type == ErrorType.Value("OBJECT_DELETED"):
return ReferenceCountingAssertionError(
object_ref.hex(), object_ref.owner_address(), object_ref.call_site()
Expand Down
3 changes: 3 additions & 0 deletions python/ray/_raylet.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@ from ray.exceptions import (
RaySystemError,
RayTaskError,
ObjectStoreFullError,
OutOfDiskError,
GetTimeoutError,
TaskCancelledError,
AsyncioActorExit,
Expand Down Expand Up @@ -166,6 +167,8 @@ cdef int check_status(const CRayStatus& status) nogil except -1:

if status.IsObjectStoreFull():
raise ObjectStoreFullError(message)
elif status.IsOutOfDisk():
raise OutOfDiskError(message)
elif status.IsInterrupted():
raise KeyboardInterrupt()
elif status.IsTimedOut():
Expand Down
20 changes: 20 additions & 0 deletions python/ray/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -341,6 +341,26 @@ def __str__(self):
)


@PublicAPI
class OutOfDiskError(RayError):
"""Indicates that the local disk is full.

This is raised if the attempt to store the object fails
because both the object store and disk are full.
"""

def __str__(self):
# TODO(scv119): expose more disk usage information and link to a doc.
return super(OutOfDiskError, self).__str__() + (
"\n"
"The object cannot be created because the local object store"
" is full and the local disk's utilization is over capacity"
" (95% by default)."
"Tip: Use `df` on this node to check disk usage and "
"`ray memory` to check object store memory usage."
)


@PublicAPI
class ObjectLostError(RayError):
"""Indicates that the object is lost from distributed memory, due to
Expand Down
1 change: 1 addition & 0 deletions python/ray/includes/common.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ cdef extern from "ray/common/status.h" namespace "ray" nogil:
c_bool IsUnknownError()
c_bool IsNotImplemented()
c_bool IsObjectStoreFull()
c_bool IsOutOfDisk()
c_bool IsRedisError()
c_bool IsTimedOut()
c_bool IsInterrupted()
Expand Down
1 change: 1 addition & 0 deletions python/ray/tests/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,7 @@ py_test_module_list(
files = [
"test_basic_3.py",
"test_output.py",
"test_out_of_disk_space.py",
"test_failure_4.py",
"test_object_spilling.py",
"test_object_spilling_no_asan.py",
Expand Down
221 changes: 221 additions & 0 deletions python/ray/tests/test_out_of_disk_space.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,221 @@
import os
import platform
import shutil
import sys
import tempfile
import time
from contextlib import contextmanager

import numpy as np
import pytest

import ray
from ray.cluster_utils import Cluster


def calculate_capacity_threshold(disk_capacity_in_bytes):
usage = shutil.disk_usage("/tmp")
threshold = min(1, 1.0 - 1.0 * (usage.free - disk_capacity_in_bytes) / usage.total)
return threshold


def get_current_usage():
usage = shutil.disk_usage("/tmp")
print(f"free: {usage.free} ")
print(f"current usage: {1.0 - 1.0 * usage.free / usage.total}")
return 1.0 - 1.0 * usage.free / usage.total


@contextmanager
def create_tmp_file(bytes):
tmp_dir = tempfile.mkdtemp(dir="/tmp")
tmp_path = os.path.join(tmp_dir, "test.txt")
with open(tmp_path, "wb") as f:
f.write(os.urandom(bytes))
try:
yield tmp_path
finally:
os.remove(tmp_path)


@pytest.mark.skipif(platform.system() == "Windows", reason="Not targeting Windows")
def test_put_out_of_disk(shutdown_only):
local_fs_capacity_threshold = calculate_capacity_threshold(200 * 1024 * 1024)
ray.init(
num_cpus=1,
object_store_memory=80 * 1024 * 1024,
_system_config={
"local_fs_capacity_threshold": local_fs_capacity_threshold,
"local_fs_monitor_interval_ms": 10,
},
)
assert get_current_usage() < local_fs_capacity_threshold
ref = ray.put(np.random.rand(20 * 1024 * 1024))
del ref
# create a temp file so that the disk size is over the threshold.
# ray.put doesn't work is that fallback allocation uses mmaped file
# that doesn't neccssary allocate disk spaces.
with create_tmp_file(250 * 1024 * 1024):
assert get_current_usage() > local_fs_capacity_threshold
time.sleep(1)
with pytest.raises(ray.exceptions.OutOfDiskError):
ray.put(np.random.rand(20 * 1024 * 1024))
# delete tmp file to reclaim space back.

assert get_current_usage() < local_fs_capacity_threshold
time.sleep(1)
ray.put(np.random.rand(20 * 1024 * 1024))


@pytest.mark.skipif(platform.system() == "Windows", reason="Not targeting Windows")
def test_task_returns(shutdown_only):
local_fs_capacity_threshold = calculate_capacity_threshold(10 * 1024 * 1024)
ray.init(
num_cpus=1,
object_store_memory=80 * 1024 * 1024,
_system_config={
"local_fs_capacity_threshold": local_fs_capacity_threshold,
"local_fs_monitor_interval_ms": 10,
},
)

# create a temp file so that the disk size is over the threshold.
# ray.put doesn't work is that fallback allocation uses mmaped file
# that doesn't neccssary allocate disk spaces.
with create_tmp_file(250 * 1024 * 1024):
assert get_current_usage() > local_fs_capacity_threshold
time.sleep(1)

@ray.remote
def foo():
time.sleep(1)
return np.random.rand(20 * 1024 * 1024) # 160 MB data

try:
ray.get(foo.remote())
except ray.exceptions.RayTaskError as e:
assert isinstance(e.cause, ray.exceptions.OutOfDiskError)


@pytest.mark.skipif(platform.system() == "Windows", reason="Not targeting Windows")
def test_task_put(shutdown_only):
local_fs_capacity_threshold = calculate_capacity_threshold(1 * 1024 * 1024)
ray.init(
num_cpus=1,
object_store_memory=80 * 1024 * 1024,
_system_config={
"local_fs_capacity_threshold": local_fs_capacity_threshold,
"local_fs_monitor_interval_ms": 10,
},
)

# create a temp file so that the disk size is over the threshold.
# ray.put doesn't work is that fallback allocation uses mmaped file
# that doesn't neccssary allocate disk spaces.
with create_tmp_file(250 * 1024 * 1024):
assert get_current_usage() > local_fs_capacity_threshold
time.sleep(1)

@ray.remote
def foo():
ref = ray.put(np.random.rand(20 * 1024 * 1024)) # 160 MB data
return ref

try:
ray.get(foo.remote())
except ray.exceptions.RayTaskError as e:
assert isinstance(e.cause, ray.exceptions.OutOfDiskError)


@pytest.mark.skipif(platform.system() == "Windows", reason="Not targeting Windows")
def test_task_args(shutdown_only):
cluster = Cluster()
cluster.add_node(
num_cpus=1,
object_store_memory=80 * 1024 * 1024,
_system_config={
"local_fs_capacity_threshold": 0,
},
resources={"out_of_memory": 1},
)
cluster.add_node(
num_cpus=1,
object_store_memory=200 * 1024 * 1024,
resources={"sufficient_memory": 1},
)
cluster.wait_for_nodes()
ray.init(address=cluster.address)

@ray.remote
def foo():
return np.random.rand(20 * 1024 * 1024) # 160 MB data

@ray.remote
def bar(obj):
print(obj)

ref = foo.options(resources={"sufficient_memory": 1}).remote()
try:
ray.get(bar.options(resources={"out_of_memory": 1}).remote(ref))
except ray.exceptions.RayTaskError as e:
assert isinstance(e.cause, ray.exceptions.OutOfDiskError)


@pytest.mark.skipif(platform.system() == "Windows", reason="Not targeting Windows")
def test_actor(shutdown_only):
cluster = Cluster()
cluster.add_node(
num_cpus=1,
object_store_memory=80 * 1024 * 1024,
_system_config={
"local_fs_capacity_threshold": 0,
},
resources={"out_of_memory": 1},
)
cluster.add_node(
num_cpus=1,
object_store_memory=200 * 1024 * 1024,
resources={"sufficient_memory": 1},
)
cluster.wait_for_nodes()
ray.init(address=cluster.address)

@ray.remote
def foo():
return np.random.rand(20 * 1024 * 1024) # 160 MB data

@ray.remote
class Actor:
def __init__(self, obj):
self._obj = obj

def foo(self):
print(self._obj)

def args_ood(self, obj):
print(obj)

def return_ood(self):
return np.random.rand(20 * 1024 * 1024)

ref = foo.options(resources={"sufficient_memory": 1}).remote()
scv119 marked this conversation as resolved.
Show resolved Hide resolved
with pytest.raises(ray.exceptions.RayActorError):
a = Actor.options(resources={"out_of_memory": 0.001}).remote(ref)
ray.get(a.foo.remote())

a = Actor.options(resources={"out_of_memory": 1}).remote(1)
ray.get(a.foo.remote())
try:
ray.get(a.args_ood.remote(ref))
except ray.exceptions.RayTaskError as e:
assert isinstance(e.cause, ray.exceptions.OutOfDiskError)

ray.get(a.foo.remote())
try:
ray.get(a.return_ood.remote())
except ray.exceptions.RayTaskError as e:
assert isinstance(e.cause, ray.exceptions.OutOfDiskError)


if __name__ == "__main__":
sys.exit(pytest.main(["-sv", __file__]))
2 changes: 2 additions & 0 deletions python/ray/tests/test_plasma_unlimited.py
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,8 @@ def test_fallback_allocation_failure(shutdown_only):
_temp_dir="/dev/shm",
_system_config={
"object_spilling_config": json.dumps(file_system_config),
# set local fs capacity to 100% so it never errors with out of disk.
"local_fs_capacity_threshold": 1,
},
)
shm_size = shutil.disk_usage("/dev/shm").total
Expand Down
Loading