Skip to content

Commit

Permalink
[Revert Please] Support core worker APIs and a generator.
Browse files Browse the repository at this point in the history
Signed-off-by: SangBin Cho <[email protected]>
  • Loading branch information
rkooo567 committed May 14, 2023
1 parent 05f468a commit 122b705
Show file tree
Hide file tree
Showing 9 changed files with 1 addition and 379 deletions.
2 changes: 1 addition & 1 deletion python/ray/_private/ray_option_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ def issubclass_safe(obj: Any, cls_: type) -> bool:
"num_returns": Option(
(int, str, type(None)),
lambda x: None
if (x is None or x == "dynamic")
if (x is None or x == "dynamic" or x > 0)
else "The keyword 'num_returns' only accepts None, a non-negative integer, or "
'"dynamic" (for generators)',
default_value=1,
Expand Down
169 changes: 0 additions & 169 deletions python/ray/_raylet.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,6 @@ from ray.exceptions import (
AsyncioActorExit,
PendingCallsLimitExceeded,
RpcError,
ObjectRefStreamEoFError,
)
from ray._private import external_storage
from ray.util.scheduling_strategies import (
Expand Down Expand Up @@ -199,145 +198,6 @@ class ObjectRefGenerator:
return len(self._refs)


class StreamingObjectRefGenerator:
def __init__(self, generator_ref, worker):
# The reference to a generator task.
self._generator_ref = generator_ref
# The last time generator task has completed.
self._generator_task_completed_time = None
# The exception raised from a generator task.
self._generator_task_exception = None
# Ray's worker class. ray._private.worker.global_worker
self.worker = worker
assert hasattr(worker, "core_worker")
self.worker.core_worker.create_object_ref_stream(self._generator_ref)

def __iter__(self):
return self

def __next__(self):
"""Waits until a next ref is available and returns the object ref.
Raises StopIteration if there's no more objects
to generate.
The object ref will contain an exception if the task fails.
When the generator task returns N objects, it can return
up to N + 1 objects (if there's a system failure, the
last object will contain a system level exception).
"""
return self._next()

def _next(
self,
timeout_s: float = -1,
sleep_interval_s: float = 0.0001,
unexpected_network_failure_timeout_s: float = 30):
"""Waits for timeout_s and returns the object ref if available.
If an object is not available within the given timeout, it
returns a nil object reference.
If -1 timeout is provided, it means it waits infinitely.
Waiting is implemented as busy waiting. You can control
the busy waiting interval via sleep_interval_s.
Raises StopIteration if there's no more objects
to generate.
The object ref will contain an exception if the task fails.
When the generator task returns N objects, it can return
up to N + 1 objects (if there's a system failure, the
last object will contain a system level exception).
Args:
timeout_s: If the next object is not ready within
this timeout, it returns the nil object ref.
sleep_interval_s: busy waiting interval.
unexpected_network_failure_timeout_s: If the
task is finished, but the next ref is not
available within this time, it will hard fail
the generator.
"""
obj = self._handle_next()
last_time = time.time()

# The generator ref will be None if the task succeeds.
# It will contain an exception if the task fails by
# a system error.
while obj.is_nil():
if self._generator_task_exception:
# The generator task has failed already.
# We raise StopIteration
# to conform the next interface in Python.
raise StopIteration
else:
# Otherwise, we should ray.get on the generator
# ref to find if the task has a system failure.
# Return the generator ref that contains the system
# error as soon as possible.
r, _ = ray.wait([self._generator_ref], timeout=0)
if len(r) > 0:
try:
ray.get(r)
except Exception as e:
# If it has failed, return the generator task ref
# so that the ref will raise an exception.
self._generator_task_exception = e
return self._generator_ref
finally:
if self._generator_task_completed_time is None:
self._generator_task_completed_time = time.time()

# Currently, since the ordering of intermediate result report
# is not guaranteed, it is possible that althoug the task
# has succeeded, all of the object references are not reported
# (e.g., when there are network failures).
# If all the object refs are not reported to the generator
# within 30 seconds, we consider is as an unreconverable error.
if self._generator_task_completed_time:
if (time.time() - self._generator_task_completed_time
> unexpected_network_failure_timeout_s):
# It means the next wasn't reported although the task
# has been terminated 30 seconds ago.
self._generator_task_exception = AssertionError
assert False, "Unexpected network failure occured."

if timeout_s != -1 and time.time() - last_time > timeout_s:
return ObjectRef.nil()

# 100us busy waiting
time.sleep(sleep_interval_s)
obj = self._handle_next()
return obj

def _handle_next(self):
try:
if hasattr(self.worker, "core_worker"):
obj = self.worker.core_worker.async_read_object_ref_stream(
self._generator_ref)
return obj
else:
raise ValueError(
"Cannot access the core worker. "
"Did you already shutdown Ray via ray.shutdown()?")
except ObjectRefStreamEoFError:
raise StopIteration

def __del__(self):
if hasattr(self.worker, "core_worker"):
# NOTE: This can be called multiple times
# because python doesn't guarantee __del__ is called
# only once.
self.worker.core_worker.delete_object_ref_stream(self._generator_ref)

def __getstate__(self):
raise TypeError(
"Serialization of the StreamingObjectRefGenerator "
"is now allowed")


cdef int check_status(const CRayStatus& status) nogil except -1:
if status.ok():
return 0
Expand All @@ -349,8 +209,6 @@ cdef int check_status(const CRayStatus& status) nogil except -1:
raise ObjectStoreFullError(message)
elif status.IsOutOfDisk():
raise OutOfDiskError(message)
elif status.IsObjectRefStreamEoF():
raise ObjectRefStreamEoFError(message)
elif status.IsInterrupted():
raise KeyboardInterrupt()
elif status.IsTimedOut():
Expand Down Expand Up @@ -3256,33 +3114,6 @@ cdef class CoreWorker:
CCoreWorkerProcess.GetCoreWorker() \
.RecordTaskLogEnd(out_end_offset, err_end_offset)

def create_object_ref_stream(self, ObjectRef generator_id):
cdef:
CObjectID c_generator_id = generator_id.native()

CCoreWorkerProcess.GetCoreWorker().CreateObjectRefStream(c_generator_id)

def delete_object_ref_stream(self, ObjectRef generator_id):
cdef:
CObjectID c_generator_id = generator_id.native()

CCoreWorkerProcess.GetCoreWorker().DelObjectRefStream(c_generator_id)

def async_read_object_ref_stream(self, ObjectRef generator_id):
cdef:
CObjectID c_generator_id = generator_id.native()
CObjectReference c_object_ref

check_status(
CCoreWorkerProcess.GetCoreWorker().AsyncReadObjectRefStream(
c_generator_id, &c_object_ref))
return ObjectRef(
c_object_ref.object_id(),
c_object_ref.owner_address().SerializeAsString(),
"",
# Already added when the ref is updated.
skip_adding_local_ref=True)

cdef void async_callback(shared_ptr[CRayObject] obj,
CObjectID object_ref,
void *user_callback) with gil:
Expand Down
4 changes: 0 additions & 4 deletions python/ray/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -336,10 +336,6 @@ def __str__(self):
return error_msg


class ObjectRefStreamEoFError(RayError):
pass


@PublicAPI
class ObjectStoreFullError(RayError):
"""Indicates that the object store is full.
Expand Down
4 changes: 0 additions & 4 deletions python/ray/includes/common.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -99,9 +99,6 @@ cdef extern from "ray/common/status.h" namespace "ray" nogil:
@staticmethod
CRayStatus NotFound()

@staticmethod
CRayStatus ObjectRefStreamEoF()

c_bool ok()
c_bool IsOutOfMemory()
c_bool IsKeyError()
Expand All @@ -121,7 +118,6 @@ cdef extern from "ray/common/status.h" namespace "ray" nogil:
c_bool IsObjectUnknownOwner()
c_bool IsRpcError()
c_bool IsOutOfResource()
c_bool IsObjectRefStreamEoF()

c_string ToString()
c_string CodeAsString()
Expand Down
5 changes: 0 additions & 5 deletions python/ray/includes/libcoreworker.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -146,11 +146,6 @@ cdef extern from "ray/core_worker/core_worker.h" nogil:
const CObjectID& return_id,
shared_ptr[CRayObject] *return_object,
const CObjectID& generator_id)
void DelObjectRefStream(const CObjectID &generator_id)
void CreateObjectRefStream(const CObjectID &generator_id)
CRayStatus AsyncReadObjectRefStream(
const CObjectID &generator_id,
CObjectReference *object_ref_out)
CObjectID AllocateDynamicReturnId()

CJobID GetCurrentJobId()
Expand Down
1 change: 0 additions & 1 deletion python/ray/tests/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@ py_test_module_list(
"test_gcs_fault_tolerance.py",
"test_gcs_utils.py",
"test_generators.py",
"test_streaming_generator.py",
"test_metrics_agent.py",
"test_metrics_head.py",
"test_component_failures_2.py",
Expand Down
141 changes: 0 additions & 141 deletions python/ray/tests/test_streaming_generator.py

This file was deleted.

Loading

0 comments on commit 122b705

Please sign in to comment.