Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make execution plan/blocklist aware of the memory ownership and who runs the plan #26650

Merged
merged 36 commits into from
Jul 22, 2022
Merged
Show file tree
Hide file tree
Changes from 35 commits
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
6414e58
Make execution stage/blocklist aware if it's run by Dataset or Datase…
Jul 17, 2022
103ebde
Merge branch 'master' of https://github.com/ray-project/ray into pipe…
Jul 18, 2022
9a8081c
fix split
Jul 18, 2022
65b7aeb
fix and test
Jul 18, 2022
445d38d
[AIR/Docs] Add Predictor Docs (#25833)
amogkam Jul 17, 2022
18f46d6
spread-split-tasks (#26638)
scv119 Jul 17, 2022
41295f4
[air] Add _max_cpu_fraction_per_node to ScalingConfig and documentati…
ericl Jul 17, 2022
3d4af4a
[RLlib] improved unittests for dataset_reader and fixed bugs (#26458)
kouroshHakha Jul 17, 2022
e999300
[RLlib]: Fix OPE trainables (#26279)
Rohan138 Jul 17, 2022
215d928
[Datasets] [Local Shuffle - 1/N] Add local shuffling option. (#26094)
clarkzinzow Jul 17, 2022
9faca47
[air] Add a warning if no CPUs are reserved for dataset execution (#2…
ericl Jul 17, 2022
d71114c
added summary why and when to use bulk vs streaming data ingest (#26637)
dmatrix Jul 18, 2022
8594d20
[AIR][CUJ] Make distributing training benchmark at silver tier (#26640)
jiaodong Jul 18, 2022
95cfc7d
[Data][split] use split_at_indices for equal split without locality h…
scv119 Jul 18, 2022
78783dc
[Air][Data] Don't promote locality_hints for split (#26647)
scv119 Jul 18, 2022
3bb72c1
Add Tao as Java worker code owner. (#26596)
jovany-wang Jul 18, 2022
0d9216c
[RLlib] Add/reorder Args of Prioritized/MixIn MultiAgentReplayBuffer.…
ArturNiederfahrenhorst Jul 18, 2022
5608c82
[Serve] Remove EXPERIMENTAL inside the comments for user config (#26521)
sihanwang41 Jul 18, 2022
16322fa
Fix windows buildkite (#26615)
jjyao Jul 18, 2022
2d92130
fix split
Jul 18, 2022
9e71304
fix and test
Jul 18, 2022
5f8b8fd
Merge branch 'master' of https://github.com/ray-project/ray into pipe…
Jul 18, 2022
060d7f7
Merge branch 'pipelineplan' of https://github.com/jianoaix/ray into p…
Jul 18, 2022
28ccc5e
test
Jul 18, 2022
9c32ae0
consumable v.s. non-consumable blocklist
jianoaix Jul 19, 2022
4b83ac1
lint; test
jianoaix Jul 19, 2022
84faf33
feedback: naming/concept
jianoaix Jul 19, 2022
397308f
Merge branch 'master' of https://github.com/ray-project/ray into pipe…
jianoaix Jul 19, 2022
b4db0e6
feedback: run_by_pipeline->run_by_consumer
jianoaix Jul 20, 2022
9c41a71
feedback: make owned_by_consumer required keyword-only arg
jianoaix Jul 20, 2022
13c9ed6
feedback: make run_by_consumer required keyword-only arg; and make th…
jianoaix Jul 21, 2022
5d1ca73
typo
jianoaix Jul 21, 2022
0942498
undo no-op changes on stage_impl.py
jianoaix Jul 21, 2022
91b0b2f
undo continued
jianoaix Jul 21, 2022
6d3e9aa
fix
jianoaix Jul 21, 2022
01f3da9
feedback: document transformation v.s. consumption concept
jianoaix Jul 22, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 39 additions & 8 deletions python/ray/data/_internal/block_list.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,19 +15,30 @@ class BlockList:
change after execution due to block splitting.
"""

def __init__(self, blocks: List[ObjectRef[Block]], metadata: List[BlockMetadata]):
def __init__(
self,
blocks: List[ObjectRef[Block]],
metadata: List[BlockMetadata],
*,
owned_by_consumer: bool,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is it the idea that if the blocks are owned_by_consumer, then we can eagerly clean them up?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep, once it's consumed, the blocklist can be cleared.

):
assert len(blocks) == len(metadata), (blocks, metadata)
self._blocks: List[ObjectRef[Block]] = blocks
self._num_blocks = len(self._blocks)
self._metadata: List[BlockMetadata] = metadata
# Whether the block list is owned by consuming APIs, and if so it can be
# eagerly deleted after read by the consumer.
self._owned_by_consumer = owned_by_consumer
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shall we clearly define what is consumer in one place, e.g. dataset.py?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.


def get_metadata(self, fetch_if_missing: bool = False) -> List[BlockMetadata]:
"""Get the metadata for all blocks."""
return self._metadata.copy()

def copy(self) -> "BlockList":
"""Perform a shallow copy of this BlockList."""
return BlockList(self._blocks, self._metadata)
return BlockList(
self._blocks, self._metadata, owned_by_consumer=self._owned_by_consumer
)

def clear(self) -> None:
"""Erase references to the tasks tracked by the BlockList."""
Expand Down Expand Up @@ -57,7 +68,11 @@ def split(self, split_size: int) -> List["BlockList"]:
meta = np.array_split(self._metadata, num_splits)
output = []
for b, m in zip(blocks, meta):
output.append(BlockList(b.tolist(), m.tolist()))
output.append(
BlockList(
b.tolist(), m.tolist(), owned_by_consumer=self._owned_by_consumer
)
)
return output

def split_by_bytes(self, bytes_per_split: int) -> List["BlockList"]:
Expand All @@ -78,15 +93,23 @@ def split_by_bytes(self, bytes_per_split: int) -> List["BlockList"]:
)
size = m.size_bytes
if cur_blocks and cur_size + size > bytes_per_split:
output.append(BlockList(cur_blocks, cur_meta))
output.append(
BlockList(
cur_blocks, cur_meta, owned_by_consumer=self._owned_by_consumer
)
)
cur_blocks = []
cur_meta = []
cur_size = 0
cur_blocks.append(b)
cur_meta.append(m)
cur_size += size
if cur_blocks:
output.append(BlockList(cur_blocks, cur_meta))
output.append(
BlockList(
cur_blocks, cur_meta, owned_by_consumer=self._owned_by_consumer
)
)
return output

def size_bytes(self) -> int:
Expand All @@ -110,8 +133,16 @@ def divide(self, block_idx: int) -> ("BlockList", "BlockList"):
"""
self._check_if_cleared()
return (
BlockList(self._blocks[:block_idx], self._metadata[:block_idx]),
BlockList(self._blocks[block_idx:], self._metadata[block_idx:]),
BlockList(
self._blocks[:block_idx],
self._metadata[:block_idx],
owned_by_consumer=self._owned_by_consumer,
),
BlockList(
self._blocks[block_idx:],
self._metadata[block_idx:],
owned_by_consumer=self._owned_by_consumer,
),
)

def get_blocks(self) -> List[ObjectRef[Block]]:
Expand Down Expand Up @@ -180,4 +211,4 @@ def randomize_block_order(self, seed: Optional[int] = None) -> "BlockList":
random.shuffle(blocks_with_metadata)
blocks, metadata = map(list, zip(*blocks_with_metadata))

return BlockList(blocks, metadata)
return BlockList(blocks, metadata, owned_by_consumer=self._owned_by_consumer)
12 changes: 10 additions & 2 deletions python/ray/data/_internal/compute.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ def _apply(
data_refs = [r[0] for r in all_refs]
refs = [r[1] for r in all_refs]

in_block_owned_by_consumer = block_list._owned_by_consumer
# Release input block references.
if clear_input_blocks:
del blocks
Expand Down Expand Up @@ -134,7 +135,11 @@ def _apply(
for block, metadata in zip(data_refs, results):
new_blocks.append(block)
new_metadata.append(metadata)
return BlockList(list(new_blocks), list(new_metadata))
return BlockList(
list(new_blocks),
list(new_metadata),
owned_by_consumer=in_block_owned_by_consumer,
)


@PublicAPI
Expand Down Expand Up @@ -213,6 +218,7 @@ def _apply(
context = DatasetContext.get_current()

blocks_in = block_list.get_blocks_with_metadata()
owned_by_consumer = block_list._owned_by_consumer

# Early release block references.
if clear_input_blocks:
Expand Down Expand Up @@ -368,7 +374,9 @@ def map_block_nosplit(
new_blocks.append(block)
new_metadata.append(metadata_mapping[block])
new_metadata = ray.get(new_metadata)
return BlockList(new_blocks, new_metadata)
return BlockList(
new_blocks, new_metadata, owned_by_consumer=owned_by_consumer
)

except Exception as e:
try:
Expand Down
12 changes: 10 additions & 2 deletions python/ray/data/_internal/fast_repartition.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,13 @@ def fast_repartition(blocks, num_blocks):
from ray.data.dataset import Dataset

wrapped_ds = Dataset(
ExecutionPlan(blocks, DatasetStats(stages={}, parent=None)), 0, lazy=False
ExecutionPlan(
blocks,
DatasetStats(stages={}, parent=None),
run_by_consumer=blocks._owned_by_consumer,
),
0,
lazy=False,
)
# Compute the (n-1) indices needed for an equal split of the data.
count = wrapped_ds.count()
Expand All @@ -40,6 +46,8 @@ def fast_repartition(blocks, num_blocks):
if s.num_blocks() > 0
]

owned_by_consumer = blocks._owned_by_consumer

# Early-release memory.
del splits, blocks, wrapped_ds

Expand Down Expand Up @@ -71,4 +79,4 @@ def fast_repartition(blocks, num_blocks):
new_blocks += empty_blocks
new_metadata += empty_metadata

return BlockList(new_blocks, new_metadata), {}
return BlockList(new_blocks, new_metadata, owned_by_consumer=owned_by_consumer), {}
21 changes: 19 additions & 2 deletions python/ray/data/_internal/lazy_block_list.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ def __init__(
cached_metadata: Optional[List[BlockPartitionMetadata]] = None,
ray_remote_args: Optional[Dict[str, Any]] = None,
stats_uuid: str = None,
*,
owned_by_consumer: bool,
):
"""Create a LazyBlockList on the provided read tasks.

Expand Down Expand Up @@ -90,6 +92,9 @@ def __init__(
tasks,
self._cached_metadata,
)
# Whether the block list is owned by consuming APIs, and if so it can be
# eagerly deleted after read by the consumer.
self._owned_by_consumer = owned_by_consumer

def get_metadata(self, fetch_if_missing: bool = False) -> List[BlockMetadata]:
"""Get the metadata for all blocks."""
Expand Down Expand Up @@ -121,6 +126,7 @@ def copy(self) -> "LazyBlockList":
block_partition_meta_refs=self._block_partition_meta_refs.copy(),
cached_metadata=self._cached_metadata,
ray_remote_args=self._remote_args.copy(),
owned_by_consumer=self._owned_by_consumer,
stats_uuid=self._stats_uuid,
)

Expand Down Expand Up @@ -159,6 +165,7 @@ def split(self, split_size: int) -> List["LazyBlockList"]:
b.tolist(),
m.tolist(),
c.tolist(),
owned_by_consumer=self._owned_by_consumer,
)
)
return output
Expand Down Expand Up @@ -187,6 +194,7 @@ def split_by_bytes(self, bytes_per_split: int) -> List["BlockList"]:
cur_blocks,
cur_blocks_meta,
cur_cached_meta,
owned_by_consumer=self._owned_by_consumer,
),
)
cur_tasks, cur_blocks, cur_blocks_meta, cur_cached_meta = [], [], [], []
Expand All @@ -198,7 +206,13 @@ def split_by_bytes(self, bytes_per_split: int) -> List["BlockList"]:
cur_size += size
if cur_blocks:
output.append(
LazyBlockList(cur_tasks, cur_blocks, cur_blocks_meta, cur_cached_meta)
LazyBlockList(
cur_tasks,
cur_blocks,
cur_blocks_meta,
cur_cached_meta,
owned_by_consumer=self._owned_by_consumer,
)
)
return output

Expand All @@ -209,12 +223,14 @@ def divide(self, part_idx: int) -> ("LazyBlockList", "LazyBlockList"):
self._block_partition_refs[:part_idx],
self._block_partition_meta_refs[:part_idx],
self._cached_metadata[:part_idx],
owned_by_consumer=self._owned_by_consumer,
)
right = LazyBlockList(
self._tasks[part_idx:],
self._block_partition_refs[part_idx:],
self._block_partition_meta_refs[part_idx:],
self._cached_metadata[part_idx:],
owned_by_consumer=self._owned_by_consumer,
)
return left, right

Expand Down Expand Up @@ -281,7 +297,7 @@ def _get_blocks_with_metadata(
def compute_to_blocklist(self) -> BlockList:
"""Launch all tasks and return a concrete BlockList."""
blocks, metadata = self._get_blocks_with_metadata()
return BlockList(blocks, metadata)
return BlockList(blocks, metadata, owned_by_consumer=self._owned_by_consumer)

def compute_first_block(self):
"""Kick off computation for the first block in the list.
Expand Down Expand Up @@ -432,6 +448,7 @@ def randomize_block_order(self, seed: Optional[int] = None) -> "LazyBlockList":
block_partition_meta_refs=block_partition_meta_refs,
cached_metadata=cached_metadata,
ray_remote_args=self._remote_args.copy(),
owned_by_consumer=self._owned_by_consumer,
stats_uuid=self._stats_uuid,
)

Expand Down
Loading