-
Notifications
You must be signed in to change notification settings - Fork 5.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[data] New executor [7/n]--- bare bones implementation of StreamingExecutor #31579
Conversation
e5dca74
to
8404b37
Compare
Signed-off-by: Eric Liang <[email protected]>
309620b
to
78a89c3
Compare
Signed-off-by: Eric Liang <[email protected]>
Signed-off-by: Eric Liang <[email protected]>
Signed-off-by: Eric Liang <[email protected]>
Signed-off-by: Eric Liang <[email protected]>
Signed-off-by: Eric Liang <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Might be missing something, but actually I wonder if we really need the inputs_done
interface? I think this is needed for all-to-all operations and to flush at the end of the job, right?
For all-to-all, I think we could move the logic inside the bulk_fn that's passed to the operator. For example, we can call the bulk_fn on each input added, and bulk_fn would only run the shuffle once it's received the expected number of inputs. This also better matches what the operator would look like in the windowed shuffle case.
For the flush case, we can track the done inputs in the op state as we are right now, but then only call op.flush()
once per op instead of once per op input.
Fine leaving this cleanup to later but it occurred to me that it may simplify the current code.
return selected | ||
|
||
|
||
def dispatch_next_task(op_state: OpState) -> None: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this just be a method of OpState?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
@@ -78,6 +79,7 @@ def add_input(self, refs: RefBundle, input_index: int) -> None: | |||
|
|||
def inputs_done(self, input_index: int) -> None: | |||
self._execution_state.inputs_done(input_index) | |||
self._inputs_done = True |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there an assumption here that there is only one input (add a note?)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed.
self._completed = True | ||
|
||
def completed(self) -> bool: | ||
return self._completed |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why don't we need to check whether self.has_next()
as we do in the map operator?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done (moved to physical op class).
@@ -94,6 +96,11 @@ def get_work_refs(self) -> List[ray.ObjectRef]: | |||
def notify_work_completed(self, task: ray.ObjectRef) -> None: | |||
self._execution_state.work_completed(task) | |||
|
|||
def completed(self) -> bool: | |||
return ( | |||
self._inputs_done and len(self.get_work_refs()) == 0 and not self.has_next() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems like this definition could be shared across the different operators?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
op_state = OpState(o2) | ||
o2.add_input = MagicMock() | ||
op_state.inqueues[0].append("dummy1") | ||
dispatch_next_task(op_state) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Suggest running this multiple times so we can test indices other than 0.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added a test for multiple inputs, and a TODO for multiple indices.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated. Regarding removing the completion method, I don't think that's possible if we want to support operators with unknown output size in general. Currently, we have a method that returns an estimate of the number of outputs for the progress bar, but this is allowed to return None for operators that don't know their number of outputs at planning time.
self._completed = True | ||
|
||
def completed(self) -> bool: | ||
return self._completed |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done (moved to physical op class).
@@ -78,6 +79,7 @@ def add_input(self, refs: RefBundle, input_index: int) -> None: | |||
|
|||
def inputs_done(self, input_index: int) -> None: | |||
self._execution_state.inputs_done(input_index) | |||
self._inputs_done = True |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed.
@@ -94,6 +96,11 @@ def get_work_refs(self) -> List[ray.ObjectRef]: | |||
def notify_work_completed(self, task: ray.ObjectRef) -> None: | |||
self._execution_state.work_completed(task) | |||
|
|||
def completed(self) -> bool: | |||
return ( | |||
self._inputs_done and len(self.get_work_refs()) == 0 and not self.has_next() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
return selected | ||
|
||
|
||
def dispatch_next_task(op_state: OpState) -> None: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
op_state = OpState(o2) | ||
o2.add_input = MagicMock() | ||
op_state.inqueues[0].append("dummy1") | ||
dispatch_next_task(op_state) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added a test for multiple inputs, and a TODO for multiple indices.
Hmm I see, yeah I was thinking we would just flush once enough outputs have been accumulated, but I guess it ends up being the same thing. It's more minor, but maybe we could at least change the signature from |
python/ray/data/_internal/execution/streaming_executor_state.py
Outdated
Show resolved
Hide resolved
Yup good point, I can't think of where an operator would need the index for the done signal. Removed. |
c641f76
to
528f87d
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM!
Signed-off-by: Eric Liang <[email protected]>
python/ray/data/_internal/execution/streaming_executor_state.py
Outdated
Show resolved
Hide resolved
Signed-off-by: Eric Liang <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM overall, mostly questions and impl/comment/test nits.
python/ray/data/_internal/execution/streaming_executor_state.py
Outdated
Show resolved
Hide resolved
python/ray/data/_internal/execution/streaming_executor_state.py
Outdated
Show resolved
Hide resolved
python/ray/data/_internal/execution/streaming_executor_state.py
Outdated
Show resolved
Hide resolved
python/ray/data/_internal/execution/streaming_executor_state.py
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
python/ray/data/_internal/execution/streaming_executor_state.py
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
All comments addressed, ptal.
Signed-off-by: Eric Liang <[email protected]>
9da93b8
to
31cfd08
Compare
…ecutor (ray-project#31579) Initial implementation of ray-project/enhancements#18, dependent on ray-project#30903 Streaming execution can be toggled with the following env var: RAY_DATASET_USE_STREAMING_EXECUTOR=0|1. Signed-off-by: Andrea Pisoni <[email protected]>
Why are these changes needed?
Initial implementation of ray-project/enhancements#18, dependent on #30903
Streaming execution can be toggled with the following env var:
RAY_DATASET_USE_STREAMING_EXECUTOR=0|1
.Initial PR TODOs:
Future TODOs: