Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Bulk executor initial implementation #30903

Merged
merged 152 commits into from
Jan 25, 2023
Merged
Show file tree
Hide file tree
Changes from 59 commits
Commits
Show all changes
152 commits
Select commit Hold shift + click to select a range
9e4451e
copy prototype
ericl Dec 5, 2022
8924a89
cleanup
ericl Dec 5, 2022
44578ce
wip compatibility
ericl Dec 5, 2022
e0a346a
add basic wiring
ericl Dec 5, 2022
22504c0
works
ericl Dec 6, 2022
0b26570
fix up split handling
ericl Dec 6, 2022
3f0e0cb
refactor legacy compat package
ericl Dec 6, 2022
eaa46b0
todo move operators fully
ericl Dec 7, 2022
3162f44
reorganize opeators
ericl Dec 7, 2022
2136170
stub out actors impl
ericl Dec 7, 2022
38ae324
improve legacy integration
ericl Dec 7, 2022
9f24555
add str
ericl Dec 7, 2022
f33c772
add own block propagation
ericl Dec 7, 2022
bf5288f
rename to tasks
ericl Dec 7, 2022
f5efe2c
add basic stats
ericl Dec 13, 2022
e5790dc
implement alltoall
ericl Dec 13, 2022
5c7e490
revert format change
ericl Dec 14, 2022
d6bee3c
Merge remote-tracking branch 'upstream/master' into bulk-executor
ericl Dec 14, 2022
1eb5519
fixme
ericl Dec 14, 2022
ec66fd0
fix
ericl Dec 14, 2022
5aa082b
fix own propagation
ericl Dec 14, 2022
c8f8c79
add debug mem metrics
ericl Dec 14, 2022
5b2f7ec
fix block clearing for datasetpipeline
ericl Dec 15, 2022
00025f5
add config
ericl Dec 15, 2022
f8570ee
misc test fixes
ericl Dec 15, 2022
edba805
fix split memory free
ericl Dec 15, 2022
683f4a1
workaround segfault
ericl Dec 16, 2022
a9c0bdf
wip
ericl Dec 16, 2022
db332e1
wip towards stats passing
ericl Dec 16, 2022
07c0c69
improve logs
ericl Dec 16, 2022
e78e800
use bulk wait for performance
ericl Dec 16, 2022
0fa159e
add ctrl-c support
ericl Dec 16, 2022
2a9e0a5
rename
ericl Dec 16, 2022
7573f99
rename node to op
ericl Dec 16, 2022
c00f867
wip
ericl Dec 18, 2022
0ae94c7
Support block bundling
jianoaix Dec 19, 2022
8c29abf
Block bundling: polish
jianoaix Dec 19, 2022
e6da60e
add mem tracing module
ericl Dec 19, 2022
a4faedc
flag protect tracing
ericl Dec 19, 2022
6b9105a
Merge branch 'bulk-executor' of github.com:ericl/ray into bulk-executor
ericl Dec 20, 2022
a676598
Merge remote-tracking branch 'upstream/master' into bulk-executor
ericl Dec 20, 2022
3427f90
add interfaces
ericl Dec 20, 2022
224854c
Merge branch 'master' of https://github.com/ray-project/ray into bulk…
jianoaix Dec 20, 2022
ebf21e3
remove meta
ericl Dec 20, 2022
b5074e0
Merge branch 'master' of https://github.com/ray-project/ray into bulk…
jianoaix Dec 21, 2022
51ceb36
add docstrings
ericl Dec 21, 2022
4dad697
Merge branch 'interfaces-1' into bulk-executor
ericl Dec 21, 2022
d0769e3
remove input metadata
ericl Dec 21, 2022
9695a27
remove hanging
ericl Dec 21, 2022
aab996e
fix gc failures
ericl Dec 21, 2022
92f8b61
Merge remote-tracking branch 'upstream/master' into bulk-executor
ericl Dec 21, 2022
af3308b
fix size est tests
ericl Dec 21, 2022
a983e52
fix stats uuid handling
ericl Dec 21, 2022
da1ab6a
Merge branch 'bulk-executor' of github.com:ericl/ray into bulkexecuto…
jianoaix Dec 21, 2022
01d4b2c
Block bundling: add more tests
jianoaix Dec 21, 2022
808c82f
fix handling of randomize block stage ownership
ericl Dec 22, 2022
fa7e3ec
Merge branch 'bulk-executor' of github.com:ericl/ray into bulk-executor
ericl Dec 22, 2022
44fb0f7
handle zero
ericl Dec 22, 2022
964aaeb
Merge remote-tracking branch 'upstream/master' into bulk-executor
ericl Dec 22, 2022
4567839
wip
ericl Dec 22, 2022
23eea81
completion guarantee comments
ericl Dec 22, 2022
beba2a6
add assert too
ericl Dec 22, 2022
887e4b3
Merge remote-tracking branch 'upstream/master' into bulk-executor
ericl Dec 23, 2022
f83edd9
add operators
ericl Dec 23, 2022
bc8f342
add test execution
ericl Dec 23, 2022
50b456a
wip
ericl Dec 23, 2022
bdfef58
wip
ericl Dec 23, 2022
d810c61
add test todos
ericl Dec 23, 2022
91b2848
add data stats todo
ericl Dec 23, 2022
9e706ad
Merge remote-tracking branch 'upstream/master' into operators
ericl Dec 23, 2022
d3e370a
Merge remote-tracking branch 'upstream/master' into bulk-executor
ericl Dec 23, 2022
d4f514a
add basic tests
ericl Dec 23, 2022
cde12ec
add note
ericl Dec 23, 2022
b95a356
typo
ericl Jan 3, 2023
1f15cd9
Merge branch 'operators' into bulk-executor
ericl Jan 3, 2023
6129d66
fix tests
ericl Jan 3, 2023
ea62366
optimize function arg passing
ericl Jan 3, 2023
ab4e5d7
Merge remote-tracking branch 'upstream/master' into operators
ericl Jan 3, 2023
510e748
Merge branch 'operators' into bulk-executor
ericl Jan 3, 2023
a6e8a18
comments
ericl Jan 3, 2023
7eec78a
Merge branch 'operators' into bulk-executor
ericl Jan 3, 2023
bc021c9
comments 2
ericl Jan 3, 2023
cd0a902
Merge branch 'operators' into bulk-executor
ericl Jan 3, 2023
718a32e
cleanup hierarchy
ericl Jan 4, 2023
f3d8a50
or zero
ericl Jan 4, 2023
3228401
Apply suggestions from code review
ericl Jan 4, 2023
d1a98d6
Merge branch 'operators' of github.com:ericl/ray into operators
ericl Jan 4, 2023
1a8dc02
min rows per bundle
ericl Jan 4, 2023
203720e
fix tests
ericl Jan 4, 2023
f9850b4
Merge branch 'operators' into bulk-executor
ericl Jan 4, 2023
e1d2e89
last comment
ericl Jan 4, 2023
690cb1d
Merge branch 'operators' into bulk-executor
ericl Jan 4, 2023
bf4ef1d
add min rows
ericl Jan 4, 2023
0807aa9
Merge branch 'operators' into bulk-executor
ericl Jan 4, 2023
f7cd953
fix tests
ericl Jan 4, 2023
1314dfb
Merge branch 'operators' into bulk-executor
ericl Jan 4, 2023
4d94aed
Merge remote-tracking branch 'upstream/master' into operators
ericl Jan 4, 2023
30c4486
Merge branch 'operators' into bulk-executor
ericl Jan 4, 2023
1c83066
add exec impl
ericl Jan 4, 2023
7cbfea4
lint
ericl Jan 4, 2023
f55101d
fix tests
ericl Jan 4, 2023
3410619
lint
ericl Jan 4, 2023
9f57758
check extra metrics
ericl Jan 4, 2023
f20fdc6
pull in optimization
ericl Jan 4, 2023
0830f1e
add all to all test
ericl Jan 5, 2023
a607a3a
Merge branch 'part-4' into bulk-executor
ericl Jan 5, 2023
e3a6dd7
Merge remote-tracking branch 'upstream/master' into bulk-executor
ericl Jan 5, 2023
19f0664
legacy compat
ericl Jan 5, 2023
be8b0d5
add split
ericl Jan 5, 2023
341acb9
off by default
ericl Jan 5, 2023
8dacdef
sanity test
ericl Jan 5, 2023
6ea1cb8
update
ericl Jan 5, 2023
9ac348b
Merge branch 'legacy-compat' into bulk-executor
ericl Jan 5, 2023
597614a
wip port the old streaming prototype
ericl Jan 6, 2023
dbc2ebd
fix comments
ericl Jan 6, 2023
458552f
add assert
ericl Jan 9, 2023
53eb19d
Merge branch 'legacy-compat' into bulk-executor
ericl Jan 9, 2023
a16e2dc
Apply suggestions from code review
ericl Jan 10, 2023
64849be
fix type
ericl Jan 10, 2023
88cfd35
Merge remote-tracking branch 'upstream/master' into legacy-compat
ericl Jan 10, 2023
965c0de
fix test
ericl Jan 10, 2023
1dfe172
revert
ericl Jan 10, 2023
7d8c2c9
Merge branch 'legacy-compat' into bulk-executor
ericl Jan 10, 2023
25d0bb2
Merge remote-tracking branch 'upstream/master' into bulk-executor
ericl Jan 10, 2023
d789c9c
flip on
ericl Jan 10, 2023
6cbbe8a
remove
ericl Jan 10, 2023
a119fc4
Merge remote-tracking branch 'upstream/master' into bulk-executor
ericl Jan 10, 2023
d4d2d0a
try removing buffer change
ericl Jan 10, 2023
4462055
remove streaming executor
ericl Jan 10, 2023
723241c
Merge branch 'master' of https://github.com/ray-project/ray into bulk…
jianoaix Jan 13, 2023
64a1453
Merge branch 'master' of https://github.com/ray-project/ray into bulk…
jianoaix Jan 17, 2023
2d554c5
Merge branch 'master' of https://github.com/ray-project/ray into bulk…
jianoaix Jan 18, 2023
13852ec
Merge branch 'master' of https://github.com/ray-project/ray into bulk…
jianoaix Jan 18, 2023
12c9eef
Merge branch 'master' of https://github.com/ray-project/ray into bulk…
jianoaix Jan 18, 2023
aef0530
Merge branch 'master' of https://github.com/ray-project/ray into bulk…
jianoaix Jan 18, 2023
4242d72
extra metric
jianoaix Jan 18, 2023
6540381
add __init__.py to operator packkage
jianoaix Jan 19, 2023
c446c58
Merge branch 'master' of https://github.com/ray-project/ray into bulk…
jianoaix Jan 19, 2023
7c28ae3
Merge branch 'master' of https://github.com/ray-project/ray into bulk…
jianoaix Jan 19, 2023
3467714
ray client block splitting
jianoaix Jan 19, 2023
a3bfe5b
Merge branch 'master' of https://github.com/ray-project/ray into bulk…
jianoaix Jan 20, 2023
99e54da
Merge branch 'master' of https://github.com/ray-project/ray into bulk…
jianoaix Jan 20, 2023
49691ae
Merge branch 'master' of https://github.com/ray-project/ray into bulk…
jianoaix Jan 23, 2023
9358e1b
fix
jianoaix Jan 23, 2023
199fe0e
fix stats
jianoaix Jan 23, 2023
c6e6a63
fix actorpool requiring num_cpus
jianoaix Jan 23, 2023
06b1ad7
fix bazel test
jianoaix Jan 24, 2023
3867061
Merge branch 'master' of https://github.com/ray-project/ray into bulk…
jianoaix Jan 24, 2023
7097973
Merge branch 'master' of https://github.com/ray-project/ray into bulk…
jianoaix Jan 24, 2023
0b74edf
minimize dif
jianoaix Jan 24, 2023
a9a66ab
less diff
jianoaix Jan 24, 2023
a265437
disable incremental take test
jianoaix Jan 24, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 7 additions & 2 deletions python/ray/data/_internal/block_batching.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from ray.actor import ActorHandle
from ray.data._internal.batcher import Batcher, ShufflingBatcher
from ray.data._internal.stats import DatasetPipelineStats, DatasetStats
from ray.data._internal.util import _trace_deallocation
from ray.data.block import Block, BlockAccessor
from ray.data.context import DatasetContext
from ray.types import ObjectRef
Expand Down Expand Up @@ -183,8 +184,12 @@ def _sliding_window(iterable: Iterable, n: int, clear_block_after_read: bool = F
yield tuple(window)
for elem in it:
block_ref = window.popleft()
if clear_block_after_read:
ray._private.internal_api.free(block_ref, local_only=False)
if clear_block_after_read and DatasetContext.get_current().eager_free:
_trace_deallocation(block_ref, "block_batching._sliding_window")
else:
_trace_deallocation(
block_ref, "block_batching._sliding_window", freed=False
)
window.append(elem)
yield tuple(window)

Expand Down
3 changes: 3 additions & 0 deletions python/ray/data/_internal/block_list.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import numpy as np

from ray.data.block import Block, BlockMetadata
from ray.data._internal.util import _trace_allocation
from ray.types import ObjectRef


Expand All @@ -23,6 +24,8 @@ def __init__(
owned_by_consumer: bool,
):
assert len(blocks) == len(metadata), (blocks, metadata)
for b in blocks:
_trace_allocation(b, "BlockList.__init__")
self._blocks: List[ObjectRef[Block]] = blocks
self._num_blocks = len(self._blocks)
self._metadata: List[BlockMetadata] = metadata
Expand Down
6 changes: 6 additions & 0 deletions python/ray/data/_internal/compute.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,9 @@ def _apply(
fn_constructor_args: Optional[Iterable[Any]] = None,
fn_constructor_kwargs: Optional[Dict[str, Any]] = None,
) -> BlockList:
assert (
not DatasetContext.get_current().new_execution_backend
), "Legacy backend off"
assert fn_constructor_args is None and fn_constructor_kwargs is None
if fn_args is None:
fn_args = tuple()
Expand Down Expand Up @@ -237,6 +240,9 @@ def _apply(
fn_constructor_kwargs: Optional[Dict[str, Any]] = None,
) -> BlockList:
"""Note: this is not part of the Dataset public API."""
assert (
not DatasetContext.get_current().new_execution_backend
), "Legacy backend off"
if fn_args is None:
fn_args = tuple()
if fn_kwargs is None:
Expand Down
Empty file.
99 changes: 99 additions & 0 deletions python/ray/data/_internal/execution/bulk_executor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
import logging
from typing import Dict, List, Iterator, Optional

import ray
from ray.data._internal.execution.interfaces import (
Executor,
ExecutionOptions,
RefBundle,
PhysicalOperator,
)
from ray.data._internal.execution.operators.input_data_buffer import InputDataBuffer
from ray.data._internal.progress_bar import ProgressBar
from ray.data._internal.stats import DatasetStats

logger = logging.getLogger(__name__)


class BulkExecutor(Executor):
def __init__(self, options: ExecutionOptions):
super().__init__(options)
self._stats = DatasetStats(stages={}, parent=None)
self._executed = False

def execute(
self, dag: PhysicalOperator, initial_stats: Optional[DatasetStats] = None
) -> Iterator[RefBundle]:
"""Synchronously executes the DAG via bottom-up recursive traversal."""

assert not self._executed, "Can only call execute once."
self._executed = True
if not isinstance(dag, InputDataBuffer):
logger.info("Executing DAG %s", dag)

if initial_stats:
self._stats = initial_stats

saved_outputs: Dict[PhysicalOperator, List[RefBundle]] = {}

def execute_recursive(op: PhysicalOperator) -> List[RefBundle]:
# Avoid duplicate executions.
if op in saved_outputs:
return saved_outputs[op]

# Compute dependencies.
inputs = [execute_recursive(dep) for dep in op.input_dependencies]

# Fully execute this operator.
logger.debug("Executing op %s", op.name)
builder = self._stats.child_builder(op.name)
try:
for i, ref_bundles in enumerate(inputs):
for r in ref_bundles:
op.add_input(r, input_index=i)
op.inputs_done(i)
output = _naive_run_until_complete(op)
finally:
op.shutdown()

# Cache and return output.
saved_outputs[op] = output
op_stats = op.get_stats()
op_metrics = op.get_metrics()
if op_stats:
self._stats = builder.build_multistage(op_stats)
self._stats.extra_metrics = op_metrics
return output

return execute_recursive(dag)

def get_stats(self) -> DatasetStats:
assert self._stats is not None, self._stats
return self._stats


def _naive_run_until_complete(op: PhysicalOperator) -> List[RefBundle]:
"""Run this operator until completion, assuming all inputs have been submitted.

Args:
op: The operator to run.

Returns:
The list of output ref bundles for the operator.
"""
output = []
tasks = op.get_work_refs()
if tasks:
bar = ProgressBar(op.name, total=op.num_outputs_total())
while tasks:
done, _ = ray.wait(tasks, fetch_local=True, timeout=0.1)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ericl I don't think that this needs a timeout, the below loop appears to be a no-op if no new tasks are done. Could you remind me if/why this timeout is needed?

Suggested change
done, _ = ray.wait(tasks, fetch_local=True, timeout=0.1)
done, _ = ray.wait(tasks, fetch_local=True)

Copy link
Contributor Author

@ericl ericl Dec 22, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This avoids high-CPU spins, and also gives us a chance to interrupt. Btw, this isn't new code, just copying from the existing implementation of bulk wait.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This avoids high-CPU spins

Hmm IIRC the ray.wait() should result in a condition variable wait somewhere in Ray Core, and shouldn't that not result in a high-CPU spin? Unless we're not compiling Ray with the correct flags on Linux...

also gives us a chance to interrupt

ray.wait() is already interruptible, it checks for signals every second regardless of the user-provided timeout IIRC.

Btw, this isn't new code, just copying from the existing implementation of bulk wait.

Understood, but we really don't want to cargo cult suboptimal code into the new execution model, if we can help it. We've already decided that we should use this redesign as an opportunity to address some long-standing tech debt and raise our quality bar.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This isn't the new execution model though--- just the legacy shim until we write the streaming executor. Hence, I'd like to avoid unnecessary execution detail changes that could cause behavior changes.

I do think this is the correct choice though for a couple reasons:

  • Setting timeout=0 has historically caused issues with not returning all futures in a timely manner. I do not know if this issue has been fully addressed.
  • Setting timeout=large value is problematic for obvious reasons.
  • Setting no timeout is also problematic since then you need to set num_returns, breaking batching.

Hence, this is a reasonable, if conservative, choice.

Copy link
Contributor

@clarkzinzow clarkzinzow Jan 3, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This isn't the new execution model though--- just the legacy shim until we write the streaming executor. Hence, I'd like to avoid unnecessary execution detail changes that could cause behavior changes.

That makes sense, we can keep to the old implementation for the legacy shim and defer any wait loop tweaks until the streaming executor PR.

Setting timeout=0 has historically caused issues with not returning all futures in a timely manner. I do not know if this issue has been fully addressed.

I don't think this would be ideal anyways, since we'd just be taking the relatively efficient Core Worker wait + a periodic signal check with an inefficient application-level spin loop. So agreed that this wouldn't be a good route.

Setting no timeout is also problematic since then you need to set num_returns, breaking batching.

Hmm what do you mean by "batching" here?

If you mean batching of multiple "ready" returns, num_returns is set to 1 by default, so that kind of batching is already not happening here since when num_returns=1, we immediately return a single ready ref when it becomes available, right? Quick example demonstrating these unbatched semantics:

In [2]: o1 = ray.put(1)

In [3]: o2 = ray.put(2)

In [4]: done, not_done = ray.wait([o1, o2], fetch_local=True, timeout=0.1)

In [5]: done
Out[5]: [ObjectRef(00ffffffffffffffffffffffffffffffffffffff0100000002000000)]

In [6]: not_done
Out[6]: [ObjectRef(00ffffffffffffffffffffffffffffffffffffff0100000001000000)]

If you are talking about concurrent object pulls (which I don't think you are since that shouldn't matter here), that should have been fixed in Ray Core (with the bug + fix that you discovered) so len(tasks) is fetched instead of num_returns. #30724

In any case, unlike the stream executor which wants to immediately know that a task has finished in order to (potentially) launch new work (and should therefore have no timeout for its wait loop), the bulk executor only cares about updating the user-facing progress and stats reporting, so batching at a 100ms granularity is pretty reasonable. If that 100ms batching is the intention, then I think that we would want to specify num_returns:

Suggested change
done, _ = ray.wait(tasks, fetch_local=True, timeout=0.1)
done, _ = ray.wait(tasks, num_returns=len(tasks), fetch_local=True, timeout=0.1)

But yeah, we can just keep the existing bulk execution wait loop behavior, since this is just a legacy shim.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you mean batching of multiple "ready" returns, num_returns is set to 1 by default

Ah, good catch. Seems like I missed this, indeed this is the batching I meant.

for ready in done:
op.notify_work_completed(ready)
tasks = op.get_work_refs()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: Since _naive_run_until_complete assumes that all operator inputs have already been submitted, it seems like we can take the single static tasks = op.get_work_refs() snapshot and work only with those refs, right?

Suggested change
done, _ = ray.wait(tasks, fetch_local=True, timeout=0.1)
for ready in done:
op.notify_work_completed(ready)
tasks = op.get_work_refs()
done, tasks = ray.wait(tasks, fetch_local=True)
for ready in done:
op.notify_work_completed(ready)

I understand that the op.notify_work_completed() + op.get_work_refs() protocol is supposed to ensure that op.get_work_refs() only returns the "incomplete" work while supporting interleaved op.add_input() additions to the work queue, so I'm guessing that you're wanting to use this same PhysicalOperator-level protocol across both the bulk and pipeline executors even if the protocol is a bit overpowered for the BulkExecutor?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This method is designed to work for "any possible operator" that correctly implements the interface, not just MapOperator. And yes, the operators fully support streaming.

while op.has_next():
bar.update(1)
output.append(op.get_next())
bar.close()
while op.has_next():
output.append(op.get_next())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Two questions:

  1. Why is this needed? After the last of op.get_work_refs() has finished, all operator outputs should be done (in order, if needed), so the previous while op.has_next() loop should completely consume the operator outputs.
  2. If this was needed, shouldn't we be updating the progress bar here?

If (1) is the case (i.e. this is not needed), this could be replaced with an assertion that this is in fact the case.

Suggested change
while op.has_next():
output.append(op.get_next())
assert not op.has_next()

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is true for the Map operator, but other operators can have arbitrary behavior, such as no tasks at all.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I get the no tasks case (and if that's the only case then maybe this should be in an else: block to make that clear), but if an operator does have tasks, I don't see how the last execution of the while tasks loop could consume all of the available operator outputs, and then have more operator outputs available right after exiting that loop, without any new operator event or state change.

but other operators can have arbitrary behavior

Well, they can't have completely arbitrary behavior; the executor needs to have some contract around when it can expect the operator to have new outputs, and when it can expect the operator's output production to be finished. E.g. one sensible contract between the executor and the operator would be:

  1. If an operator has 1 or more tasks, then it only produces more outputs after a task completes (i.e. after op.notify_work_completed() is called).
  2. If an operator has 0 tasks, then it has a static set of outputs.

For (1), we try to consume the operator outputs whenever a task finishes; for (2), we do a single pass of operator output consumption. Whatever the contract, I'm just arguing for making it as explicit as possible in the executor code, so devs (like me) aren't left wondering "wait why would we need this last operator output consumption if the operator had tasks, shouldn't it have been exhausted at the end of the task loop?"

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, good point. I added some comments in the docstring and here to clarify this completion behavior.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It might be more clear to add a method to indicate this completion state of the operator. The added comment at has_next() and get_work_ref() seems a bit off-topic for them.

return output
12 changes: 11 additions & 1 deletion python/ray/data/_internal/execution/interfaces.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@

import ray
from ray.data._internal.stats import DatasetStats, StatsDict
from ray.data._internal.util import _trace_deallocation
from ray.data.block import Block, BlockMetadata
from ray.data.context import DatasetContext
from ray.types import ObjectRef


Expand Down Expand Up @@ -62,7 +64,15 @@ def destroy_if_owned(self) -> int:
Returns:
The number of bytes freed.
"""
raise NotImplementedError
if self.owns_blocks and DatasetContext.get_current().eager_free:
size = self.size_bytes()
for b in self.blocks:
_trace_deallocation(b[0], "RefBundle.destroy_if_owned")
return size
else:
for b in self.blocks:
_trace_deallocation(b[0], "RefBundle.destroy_if_owned", freed=False)
return 0


@dataclass
Expand Down
Loading