Skip to content

Commit

Permalink
Make execution plan/blocklist aware of the memory ownership and who r…
Browse files Browse the repository at this point in the history
…uns the plan (ray-project#26650)

Having the indicator about who's running the stage and who created a blocklist will enable the eager memory releasing.

This is an alternative with better abstraction to ray-project#26196.

Note: this doesn't work for Dataset.split() yet, will do in a followup PR.
Signed-off-by: Stefan van der Kleij <[email protected]>
  • Loading branch information
jianoaix authored and Stefan van der Kleij committed Aug 18, 2022
1 parent fe26cab commit c7ac23f
Show file tree
Hide file tree
Showing 13 changed files with 401 additions and 131 deletions.
47 changes: 39 additions & 8 deletions python/ray/data/_internal/block_list.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,19 +15,30 @@ class BlockList:
change after execution due to block splitting.
"""

def __init__(self, blocks: List[ObjectRef[Block]], metadata: List[BlockMetadata]):
def __init__(
self,
blocks: List[ObjectRef[Block]],
metadata: List[BlockMetadata],
*,
owned_by_consumer: bool,
):
assert len(blocks) == len(metadata), (blocks, metadata)
self._blocks: List[ObjectRef[Block]] = blocks
self._num_blocks = len(self._blocks)
self._metadata: List[BlockMetadata] = metadata
# Whether the block list is owned by consuming APIs, and if so it can be
# eagerly deleted after read by the consumer.
self._owned_by_consumer = owned_by_consumer

def get_metadata(self, fetch_if_missing: bool = False) -> List[BlockMetadata]:
"""Get the metadata for all blocks."""
return self._metadata.copy()

def copy(self) -> "BlockList":
"""Perform a shallow copy of this BlockList."""
return BlockList(self._blocks, self._metadata)
return BlockList(
self._blocks, self._metadata, owned_by_consumer=self._owned_by_consumer
)

def clear(self) -> None:
"""Erase references to the tasks tracked by the BlockList."""
Expand Down Expand Up @@ -57,7 +68,11 @@ def split(self, split_size: int) -> List["BlockList"]:
meta = np.array_split(self._metadata, num_splits)
output = []
for b, m in zip(blocks, meta):
output.append(BlockList(b.tolist(), m.tolist()))
output.append(
BlockList(
b.tolist(), m.tolist(), owned_by_consumer=self._owned_by_consumer
)
)
return output

def split_by_bytes(self, bytes_per_split: int) -> List["BlockList"]:
Expand All @@ -78,15 +93,23 @@ def split_by_bytes(self, bytes_per_split: int) -> List["BlockList"]:
)
size = m.size_bytes
if cur_blocks and cur_size + size > bytes_per_split:
output.append(BlockList(cur_blocks, cur_meta))
output.append(
BlockList(
cur_blocks, cur_meta, owned_by_consumer=self._owned_by_consumer
)
)
cur_blocks = []
cur_meta = []
cur_size = 0
cur_blocks.append(b)
cur_meta.append(m)
cur_size += size
if cur_blocks:
output.append(BlockList(cur_blocks, cur_meta))
output.append(
BlockList(
cur_blocks, cur_meta, owned_by_consumer=self._owned_by_consumer
)
)
return output

def size_bytes(self) -> int:
Expand All @@ -110,8 +133,16 @@ def divide(self, block_idx: int) -> ("BlockList", "BlockList"):
"""
self._check_if_cleared()
return (
BlockList(self._blocks[:block_idx], self._metadata[:block_idx]),
BlockList(self._blocks[block_idx:], self._metadata[block_idx:]),
BlockList(
self._blocks[:block_idx],
self._metadata[:block_idx],
owned_by_consumer=self._owned_by_consumer,
),
BlockList(
self._blocks[block_idx:],
self._metadata[block_idx:],
owned_by_consumer=self._owned_by_consumer,
),
)

def get_blocks(self) -> List[ObjectRef[Block]]:
Expand Down Expand Up @@ -180,4 +211,4 @@ def randomize_block_order(self, seed: Optional[int] = None) -> "BlockList":
random.shuffle(blocks_with_metadata)
blocks, metadata = map(list, zip(*blocks_with_metadata))

return BlockList(blocks, metadata)
return BlockList(blocks, metadata, owned_by_consumer=self._owned_by_consumer)
12 changes: 10 additions & 2 deletions python/ray/data/_internal/compute.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ def _apply(
data_refs = [r[0] for r in all_refs]
refs = [r[1] for r in all_refs]

in_block_owned_by_consumer = block_list._owned_by_consumer
# Release input block references.
if clear_input_blocks:
del blocks
Expand Down Expand Up @@ -134,7 +135,11 @@ def _apply(
for block, metadata in zip(data_refs, results):
new_blocks.append(block)
new_metadata.append(metadata)
return BlockList(list(new_blocks), list(new_metadata))
return BlockList(
list(new_blocks),
list(new_metadata),
owned_by_consumer=in_block_owned_by_consumer,
)


@PublicAPI
Expand Down Expand Up @@ -213,6 +218,7 @@ def _apply(
context = DatasetContext.get_current()

blocks_in = block_list.get_blocks_with_metadata()
owned_by_consumer = block_list._owned_by_consumer

# Early release block references.
if clear_input_blocks:
Expand Down Expand Up @@ -368,7 +374,9 @@ def map_block_nosplit(
new_blocks.append(block)
new_metadata.append(metadata_mapping[block])
new_metadata = ray.get(new_metadata)
return BlockList(new_blocks, new_metadata)
return BlockList(
new_blocks, new_metadata, owned_by_consumer=owned_by_consumer
)

except Exception as e:
try:
Expand Down
12 changes: 10 additions & 2 deletions python/ray/data/_internal/fast_repartition.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,13 @@ def fast_repartition(blocks, num_blocks):
from ray.data.dataset import Dataset

wrapped_ds = Dataset(
ExecutionPlan(blocks, DatasetStats(stages={}, parent=None)), 0, lazy=False
ExecutionPlan(
blocks,
DatasetStats(stages={}, parent=None),
run_by_consumer=blocks._owned_by_consumer,
),
0,
lazy=False,
)
# Compute the (n-1) indices needed for an equal split of the data.
count = wrapped_ds.count()
Expand All @@ -40,6 +46,8 @@ def fast_repartition(blocks, num_blocks):
if s.num_blocks() > 0
]

owned_by_consumer = blocks._owned_by_consumer

# Early-release memory.
del splits, blocks, wrapped_ds

Expand Down Expand Up @@ -71,4 +79,4 @@ def fast_repartition(blocks, num_blocks):
new_blocks += empty_blocks
new_metadata += empty_metadata

return BlockList(new_blocks, new_metadata), {}
return BlockList(new_blocks, new_metadata, owned_by_consumer=owned_by_consumer), {}
21 changes: 19 additions & 2 deletions python/ray/data/_internal/lazy_block_list.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ def __init__(
cached_metadata: Optional[List[BlockPartitionMetadata]] = None,
ray_remote_args: Optional[Dict[str, Any]] = None,
stats_uuid: str = None,
*,
owned_by_consumer: bool,
):
"""Create a LazyBlockList on the provided read tasks.
Expand Down Expand Up @@ -90,6 +92,9 @@ def __init__(
tasks,
self._cached_metadata,
)
# Whether the block list is owned by consuming APIs, and if so it can be
# eagerly deleted after read by the consumer.
self._owned_by_consumer = owned_by_consumer

def get_metadata(self, fetch_if_missing: bool = False) -> List[BlockMetadata]:
"""Get the metadata for all blocks."""
Expand Down Expand Up @@ -121,6 +126,7 @@ def copy(self) -> "LazyBlockList":
block_partition_meta_refs=self._block_partition_meta_refs.copy(),
cached_metadata=self._cached_metadata,
ray_remote_args=self._remote_args.copy(),
owned_by_consumer=self._owned_by_consumer,
stats_uuid=self._stats_uuid,
)

Expand Down Expand Up @@ -159,6 +165,7 @@ def split(self, split_size: int) -> List["LazyBlockList"]:
b.tolist(),
m.tolist(),
c.tolist(),
owned_by_consumer=self._owned_by_consumer,
)
)
return output
Expand Down Expand Up @@ -187,6 +194,7 @@ def split_by_bytes(self, bytes_per_split: int) -> List["BlockList"]:
cur_blocks,
cur_blocks_meta,
cur_cached_meta,
owned_by_consumer=self._owned_by_consumer,
),
)
cur_tasks, cur_blocks, cur_blocks_meta, cur_cached_meta = [], [], [], []
Expand All @@ -198,7 +206,13 @@ def split_by_bytes(self, bytes_per_split: int) -> List["BlockList"]:
cur_size += size
if cur_blocks:
output.append(
LazyBlockList(cur_tasks, cur_blocks, cur_blocks_meta, cur_cached_meta)
LazyBlockList(
cur_tasks,
cur_blocks,
cur_blocks_meta,
cur_cached_meta,
owned_by_consumer=self._owned_by_consumer,
)
)
return output

Expand All @@ -209,12 +223,14 @@ def divide(self, part_idx: int) -> ("LazyBlockList", "LazyBlockList"):
self._block_partition_refs[:part_idx],
self._block_partition_meta_refs[:part_idx],
self._cached_metadata[:part_idx],
owned_by_consumer=self._owned_by_consumer,
)
right = LazyBlockList(
self._tasks[part_idx:],
self._block_partition_refs[part_idx:],
self._block_partition_meta_refs[part_idx:],
self._cached_metadata[part_idx:],
owned_by_consumer=self._owned_by_consumer,
)
return left, right

Expand Down Expand Up @@ -281,7 +297,7 @@ def _get_blocks_with_metadata(
def compute_to_blocklist(self) -> BlockList:
"""Launch all tasks and return a concrete BlockList."""
blocks, metadata = self._get_blocks_with_metadata()
return BlockList(blocks, metadata)
return BlockList(blocks, metadata, owned_by_consumer=self._owned_by_consumer)

def compute_first_block(self):
"""Kick off computation for the first block in the list.
Expand Down Expand Up @@ -432,6 +448,7 @@ def randomize_block_order(self, seed: Optional[int] = None) -> "LazyBlockList":
block_partition_meta_refs=block_partition_meta_refs,
cached_metadata=cached_metadata,
ray_remote_args=self._remote_args.copy(),
owned_by_consumer=self._owned_by_consumer,
stats_uuid=self._stats_uuid,
)

Expand Down
Loading

0 comments on commit c7ac23f

Please sign in to comment.