Skip to content

Commit

Permalink
[core] Make ray.get(timeout=0) to throw timeout error (ray-project#35126
Browse files Browse the repository at this point in the history
)

Why are these changes needed?
With telemetry tracking since ray 2.3, we have not seen significant and recent usage of the timeout=0 behaviour:
image

Raw query behind firewall

So we will update this behaviour as documented in ray-project#28465

cc vitrioil for the original PR: https://github.com/ray-project/ray/pull/30210/files
Signed-off-by: Ricky Xu <[email protected]>

---------

Signed-off-by: Ricky Xu <[email protected]>
Co-authored-by: vitrioil <[email protected]>
Co-authored-by: Prem <[email protected]>
Signed-off-by: e428265 <[email protected]>
  • Loading branch information
3 people authored and arvind-chandra committed Aug 31, 2023
1 parent 2a0bb09 commit 8771b26
Show file tree
Hide file tree
Showing 5 changed files with 16 additions and 36 deletions.
34 changes: 5 additions & 29 deletions python/ray/_private/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -738,12 +738,12 @@ def get_objects(self, object_refs: list, timeout: Optional[float] = None):
"which is not an ray.ObjectRef."
)

timeout_ms = int(timeout * 1000) if timeout else -1
timeout_ms = int(timeout * 1000) if timeout is not None else -1
data_metadata_pairs = self.core_worker.get_objects(
object_refs, self.current_task_id, timeout_ms
)
debugger_breakpoint = b""
for (data, metadata) in data_metadata_pairs:
for data, metadata in data_metadata_pairs:
if metadata:
metadata_fields = metadata.split(b",")
if len(metadata_fields) >= 2 and metadata_fields[1].startswith(
Expand Down Expand Up @@ -2464,12 +2464,9 @@ def get(
to get.
timeout (Optional[float]): The maximum amount of time in seconds to
wait before returning. Set this to None will block until the
corresponding object becomes available.
WARNING: In future ray releases ``timeout=0`` will return the object
immediately if it's available, else raise GetTimeoutError in accordance with
the above docstring. The current behavior of blocking until objects become
available of ``timeout=0`` is considered to be a bug, see
https://github.com/ray-project/ray/issues/28465.
corresponding object becomes available. Setting ``timeout=0`` will
return the object immediately if it's available, else raise
GetTimeoutError in accordance with the above docstring.
Returns:
A Python object or a list of Python objects.
Expand All @@ -2480,26 +2477,6 @@ def get(
Exception: An exception is raised if the task that created the object
or that created one of the objects raised an exception.
"""
if timeout == 0:
if os.environ.get("RAY_WARN_RAY_GET_TIMEOUT_ZERO", "1") == "1":
import warnings

warnings.warn(
(
"Please use timeout=None if you expect ray.get() to block. "
"Setting timeout=0 in future ray releases will raise "
"GetTimeoutError if the objects references are not available. "
"You could suppress this warning by setting "
"RAY_WARN_RAY_GET_TIMEOUT_ZERO=0."
),
UserWarning,
)

# Record this usage in telemetry
import ray._private.usage.usage_lib as usage_lib

usage_lib.record_extra_usage_tag(usage_lib.TagKey.RAY_GET_TIMEOUT_ZERO, "True")

worker = global_worker
worker.check_connected()

Expand Down Expand Up @@ -2710,7 +2687,6 @@ def wait(
worker.check_connected()
# TODO(swang): Check main thread.
with profiling.profile("ray.wait"):

# TODO(rkn): This is a temporary workaround for
# https://github.com/ray-project/ray/issues/997. However, it should be
# fixed in Arrow instead of here.
Expand Down
4 changes: 4 additions & 0 deletions python/ray/tests/test_basic_2.py
Original file line number Diff line number Diff line change
Expand Up @@ -384,6 +384,10 @@ def test_get_with_timeout(ray_start_regular_shared):
with pytest.raises(TimeoutError):
ray.get(result_id, timeout=0.1)

# timeout of 0 should raise an error
with pytest.raises(GetTimeoutError):
ray.get(result_id, timeout=0)

# Check that a subsequent get() returns early.
ray.get(signal.send.remote())
start = time.time()
Expand Down
6 changes: 3 additions & 3 deletions python/ray/tests/test_object_spilling.py
Original file line number Diff line number Diff line change
Expand Up @@ -319,7 +319,7 @@ def test_spill_objects_automatically(fs_only_object_spilling_config, shutdown_on
index = random.choice(list(range(buffer_length)))
ref = replay_buffer[index]
solution = solution_buffer[index]
sample = ray.get(ref, timeout=0)
sample = ray.get(ref, timeout=None)
assert np.array_equal(sample, solution)
assert_no_thrashing(address["address"])

Expand Down Expand Up @@ -359,7 +359,7 @@ def test_unstable_spill_objects_automatically(unstable_spilling_config, shutdown
index = random.choice(list(range(buffer_length)))
ref = replay_buffer[index]
solution = solution_buffer[index]
sample = ray.get(ref, timeout=0)
sample = ray.get(ref, timeout=None)
assert np.array_equal(sample, solution)
assert_no_thrashing(address["address"])

Expand Down Expand Up @@ -397,7 +397,7 @@ def test_slow_spill_objects_automatically(slow_spilling_config, shutdown_only):
index = random.choice(list(range(buffer_length)))
ref = replay_buffer[index]
solution = solution_buffer[index]
sample = ray.get(ref, timeout=0)
sample = ray.get(ref, timeout=None)
assert np.array_equal(sample, solution)
assert_no_thrashing(address["address"])

Expand Down
6 changes: 3 additions & 3 deletions python/ray/tests/test_object_spilling_2.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ def test_delete_objects_delete_while_creating(object_spilling_config, shutdown_o
# Do random sampling.
for _ in range(200):
ref = random.choice(replay_buffer)
sample = ray.get(ref, timeout=0)
sample = ray.get(ref, timeout=None)
assert np.array_equal(sample, arr)

# After all, make sure all objects are killed without race condition.
Expand Down Expand Up @@ -126,7 +126,7 @@ def create_objects(self):
# Do random sampling.
for _ in range(200):
ref = random.choice(self.replay_buffer)
sample = ray.get(ref, timeout=0)
sample = ray.get(ref, timeout=None)
assert np.array_equal(sample, arr)

a = Actor.remote()
Expand Down Expand Up @@ -288,7 +288,7 @@ def test_fusion_objects(fs_only_object_spilling_config, shutdown_only):
index = random.choice(list(range(buffer_length)))
ref = replay_buffer[index]
solution = solution_buffer[index]
sample = ray.get(ref, timeout=0)
sample = ray.get(ref, timeout=None)
assert np.array_equal(sample, solution)

is_test_passing = False
Expand Down
2 changes: 1 addition & 1 deletion python/ray/tests/test_object_spilling_3.py
Original file line number Diff line number Diff line change
Expand Up @@ -315,7 +315,7 @@ def test_spill_deadlock(object_spilling_config, shutdown_only):
if random.randint(0, 9) < 5:
for _ in range(5):
ref = random.choice(replay_buffer)
sample = ray.get(ref, timeout=0)
sample = ray.get(ref, timeout=None)
assert np.array_equal(sample, arr)
assert_no_thrashing(address["address"])

Expand Down

0 comments on commit 8771b26

Please sign in to comment.