Skip to content

Commit

Permalink
[aDAG] support buffered input (#47272)
Browse files Browse the repository at this point in the history
\Based on https://docs.google.com/document/d/1Ka_HFwPBNIY1u3kuroHOSZMEQ8AgwpYciZ4n08HJ0Xc/edit

When there are many in-flight requests (pipelining inputs to the DAG), 2 problems occur.

Input submitter timeout. InputSubmitter.write() waits until the buffer is read from downstream tasks. Since timeout count is started as soon as InputSubmitter.write() is called, when there are many in-flight requests, the later requests are likely to timeout.
Pipeline bubble. Output fetcher doesn’t read the channel until CompiledDagRef.get is called. It means the upstream task (actor 2) has to be blocked until .get is called from a driver although it can execute tasks.
This PR solves the problem by providing multiple buffer per shm channel. Note that the buffering is not supported for nccl yet (we can do it when we overlap compute/comm).

Main changes

Introduce BufferedSharedMemoryChannel which allows to create multiple buffers (10 by default). Read/write is done in round robin manner.
When you have more in-flight request than the buffer size, Dag can still have timeout error. To make debugging easy and behavior straightforward, we introduce max_buffered_inputs_ argument. If there are more than max_buffered_inputs_ requests submitted to the dag without ray.get, it immediately raises an exception.
  • Loading branch information
rkooo567 authored Sep 10, 2024
1 parent 0773760 commit f40313b
Show file tree
Hide file tree
Showing 18 changed files with 578 additions and 247 deletions.
1 change: 1 addition & 0 deletions doc/source/ray-core/api/exceptions.rst
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ Exceptions
ray.exceptions.ObjectReconstructionFailedLineageEvictedError
ray.exceptions.RayChannelError
ray.exceptions.RayChannelTimeoutError
ray.exceptions.RayAdagCapacityExceeded
ray.exceptions.RuntimeEnvSetupError
ray.exceptions.CrossLanguageError
ray.exceptions.RaySystemError
Expand Down
2 changes: 1 addition & 1 deletion python/ray/dag/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ py_test_module_list(
)

py_test_module_list(
size = "large",
size = "enormous",
files = [
"tests/experimental/test_accelerated_dag.py",
],
Expand Down
66 changes: 52 additions & 14 deletions python/ray/dag/compiled_dag_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ def do_exec_tasks(
if done:
break
for operation in schedule:
print("SANG-TODO operation: ", operation)
done = tasks[operation.exec_task_idx].exec_operation(
self, operation.type
)
Expand Down Expand Up @@ -597,6 +598,7 @@ def __init__(
enable_asyncio: bool = False,
asyncio_max_queue_size: Optional[int] = None,
max_buffered_results: Optional[int] = None,
max_inflight_executions: Optional[int] = None,
):
"""
Args:
Expand Down Expand Up @@ -625,6 +627,10 @@ def __init__(
executions is beyond the DAG capacity, the new execution would
be blocked in the first place; therefore, this limit is only
enforced when it is smaller than the DAG capacity.
max_inflight_executions: The maximum number of in-flight executions that
are allowed to be sent to this DAG. Before submitting more requests,
the caller is responsible for calling ray.get to get the result,
otherwise, RayAdagCapacityExceeded is raised.
Returns:
Channel: A wrapper around ray.ObjectRef.
Expand All @@ -633,29 +639,37 @@ def __init__(

ctx = DAGContext.get_current()

self._enable_asyncio: bool = enable_asyncio
self._fut_queue = asyncio.Queue()
self._asyncio_max_queue_size: Optional[int] = asyncio_max_queue_size
# TODO(rui): consider unify it with asyncio_max_queue_size
self._max_buffered_results: Optional[int] = max_buffered_results
if self._max_buffered_results is None:
self._max_buffered_results = ctx.max_buffered_results
self._max_inflight_executions = max_inflight_executions
if self._max_inflight_executions is None:
self._max_inflight_executions = ctx.max_inflight_executions
self._dag_id = uuid.uuid4().hex
self._execution_timeout: Optional[float] = execution_timeout
if self._execution_timeout is None:
self._execution_timeout = ctx.execution_timeout
self._buffer_size_bytes: Optional[int] = buffer_size_bytes
if self._buffer_size_bytes is None:
self._buffer_size_bytes = ctx.buffer_size_bytes

self._default_type_hint: ChannelOutputType = SharedMemoryType(
self._buffer_size_bytes
self._buffer_size_bytes,
# We conservatively set num_shm_buffers to _max_inflight_executions.
# It means that the DAG can be underutilized, but it guarantees there's
# no false positive timeouts.
num_shm_buffers=self._max_inflight_executions,
)
if not isinstance(self._buffer_size_bytes, int) or self._buffer_size_bytes <= 0:
raise ValueError(
"`buffer_size_bytes` must be a positive integer, found "
f"{self._buffer_size_bytes}"
)

self._enable_asyncio: bool = enable_asyncio
self._fut_queue = asyncio.Queue()
self._asyncio_max_queue_size: Optional[int] = asyncio_max_queue_size
# TODO(rui): consider unify it with asyncio_max_queue_size
self._max_buffered_results: Optional[int] = max_buffered_results
if self._max_buffered_results is None:
self._max_buffered_results = ctx.max_buffered_results
# Used to ensure that the future returned to the
# caller corresponds to the correct DAG output. I.e.
# order of futures added to fut_queue should match the
Expand Down Expand Up @@ -721,7 +735,7 @@ def __init__(
self._execution_index: int = 0
# The maximum index of finished executions.
# All results with higher indexes have not been generated yet.
self._max_execution_index: int = -1
self._max_finished_execution_index: int = -1
self._result_buffer: Dict[int, Any] = {}

def _get_creator_or_proxy_actor() -> "ray.actor.ActorHandle":
Expand Down Expand Up @@ -765,6 +779,12 @@ def _get_creator_or_proxy_actor() -> "ray.actor.ActorHandle":

self._creator_or_proxy_actor = _get_creator_or_proxy_actor()

def increment_max_finished_execution_index(self) -> None:
"""Increment the max finished execution index. It is used to
figure out the max number of in-flight requests to the DAG
"""
self._max_finished_execution_index += 1

@property
def has_single_output(self):
return self._has_single_output
Expand Down Expand Up @@ -1857,6 +1877,20 @@ def run(self):
monitor.start()
return monitor

def raise_if_too_many_inflight_requests(self):
num_in_flight_requests = (
self._execution_index - self._max_finished_execution_index
)
if num_in_flight_requests > self._max_inflight_executions:
raise ray.exceptions.RayAdagCapacityExceeded(
f"There are {num_in_flight_requests} in-flight requests which "
"is more than specified _max_inflight_executions of the dag: "
f"{self._max_inflight_executions}. Retrieve the output using "
"ray.get before submitting more requests or increase "
"`max_inflight_executions`. "
"`adag.experimental_compile(_max_inflight_executions=...)`"
)

def _execute_until(
self,
execution_index: int,
Expand Down Expand Up @@ -1885,10 +1919,10 @@ def _execute_until(
if timeout is None:
timeout = ctx.retrieval_timeout

while self._max_execution_index < execution_index:
if self._max_execution_index + 1 == execution_index:
while self._max_finished_execution_index < execution_index:
if self._max_finished_execution_index + 1 == execution_index:
# Directly fetch and return without buffering
self._max_execution_index += 1
self.increment_max_finished_execution_index()
return self._dag_output_fetcher.read(timeout)
# Otherwise, buffer the result
if len(self._result_buffer) >= self._max_buffered_results:
Expand All @@ -1897,10 +1931,10 @@ def _execute_until(
f"buffered results is {self._max_buffered_results}; call ray.get() "
"on previous CompiledDAGRefs to free them up from buffer."
)
self._max_execution_index += 1
self.increment_max_finished_execution_index()
start_time = time.monotonic()
self._result_buffer[
self._max_execution_index
self._max_finished_execution_index
] = self._dag_output_fetcher.read(timeout)
if timeout != -1:
timeout -= time.monotonic() - start_time
Expand Down Expand Up @@ -1946,6 +1980,7 @@ def execute(
else:
inp = RayDAGArgs(args=args, kwargs=kwargs)

self.raise_if_too_many_inflight_requests()
self._dag_submitter.write(inp, self._execution_timeout)

ref = CompiledDAGRef(self, self._execution_index)
Expand Down Expand Up @@ -2004,6 +2039,7 @@ async def execute_async(
else:
inp = RayDAGArgs(args=args, kwargs=kwargs)

self.raise_if_too_many_inflight_requests()
await self._dag_submitter.write(inp)
# Allocate a future that the caller can use to get the result.
fut = asyncio.Future()
Expand Down Expand Up @@ -2039,13 +2075,15 @@ def build_compiled_dag_from_ray_dag(
enable_asyncio: bool = False,
asyncio_max_queue_size: Optional[int] = None,
max_buffered_results: Optional[int] = None,
max_inflight_executions: Optional[int] = None,
) -> "CompiledDAG":
compiled_dag = CompiledDAG(
execution_timeout,
buffer_size_bytes,
enable_asyncio,
asyncio_max_queue_size,
max_buffered_results,
max_inflight_executions,
)

def _build_compiled_dag(node):
Expand Down
6 changes: 6 additions & 0 deletions python/ray/dag/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,11 @@
# The default max_buffered_results is 1000, and the default buffer size is 1 MB.
# The maximum memory usage for buffered results is 1 GB.
DEFAULT_MAX_BUFFERED_RESULTS = int(os.environ.get("RAY_DAG_max_buffered_results", 1000))
# The default number of in-flight executions that can be submitted before consuming the
# output.
DEFAULT_MAX_INFLIGHT_EXECUTIONS = int(
os.environ.get("RAY_DAG_max_inflight_executions", 10)
)


@DeveloperAPI
Expand Down Expand Up @@ -60,6 +65,7 @@ class DAGContext:
buffer_size_bytes: int = DEFAULT_BUFFER_SIZE_BYTES
asyncio_max_queue_size: int = DEFAULT_ASYNCIO_MAX_QUEUE_SIZE
max_buffered_results: int = DEFAULT_MAX_BUFFERED_RESULTS
max_inflight_executions: int = DEFAULT_MAX_INFLIGHT_EXECUTIONS

@staticmethod
def get_current() -> "DAGContext":
Expand Down
6 changes: 6 additions & 0 deletions python/ray/dag/dag_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,7 @@ def experimental_compile(
enable_asyncio: bool = False,
_asyncio_max_queue_size: Optional[int] = None,
_max_buffered_results: Optional[int] = None,
_max_inflight_executions: Optional[int] = None,
) -> "ray.dag.CompiledDAG":
"""Compile an accelerated execution path for this DAG.
Expand All @@ -186,6 +187,10 @@ def experimental_compile(
executions is beyond the DAG capacity, the new execution would
be blocked in the first place; therefore, this limit is only
enforced when it is smaller than the DAG capacity.
_max_inflight_executions: The maximum number of in-flight requests that
are allowed to be sent to this DAG. Before submitting more requests,
the caller is responsible for calling ray.get to clear finished
in-flight requests.
Returns:
A compiled DAG.
Expand Down Expand Up @@ -218,6 +223,7 @@ def experimental_compile(
enable_asyncio,
_asyncio_max_queue_size,
_max_buffered_results,
_max_inflight_executions,
)

def execute(
Expand Down
3 changes: 3 additions & 0 deletions python/ray/dag/dag_node_operation.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,9 @@ def __init__(
self.exec_task_idx = exec_task_idx
self.type = operation_type

def __repr__(self):
return f"(Task idx: {self.exec_task_idx}, Type: {self.type})"


@total_ordering
class _DAGOperationGraphNode:
Expand Down
Loading

0 comments on commit f40313b

Please sign in to comment.