Skip to content

Commit

Permalink
[data] Fix test_resource_manager.py. (#43547)
Browse files Browse the repository at this point in the history
  • Loading branch information
raulchen authored Feb 29, 2024
1 parent 9e419f4 commit 5f99c8e
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 37 deletions.
40 changes: 18 additions & 22 deletions python/ray/data/_internal/execution/resource_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ def _estimate_object_store_memory(self, op, state) -> int:

# Op's external output buffer.
mem_op_outputs = state.outqueue_memory_usage()
# Input buffers of the downstream operators.
for next_op in op.output_dependencies:
mem_op_outputs += (
next_op.metrics.obj_store_mem_internal_inqueue
Expand Down Expand Up @@ -270,7 +271,7 @@ class ReservationOpResourceAllocator(OpResourceAllocator):
num_map_ops` resources, half of which is reserved only for the operator outputs,
excluding pending task outputs.
3. Non-reserved resources are shared among all operators.
3. In each scheduling iteration, each map operator will get "remaining of their own
4. In each scheduling iteration, each map operator will get "remaining of their own
reserved resources" + "remaining of shared resources / num_map_ops" resources.
The `reservation_ratio` is set to 50% by default. Users can tune this value to
Expand Down Expand Up @@ -353,8 +354,8 @@ def __init__(self, resource_manager: ResourceManager, reservation_ratio: float):
# i.e., `RessourceManager._mem_op_outputs`.
#
# Note, if we don't reserve memory for op outputs, all the budget may be used by
# the pending task outputs, and/or op' internal output buffers (the latter can
# happen when `preserve_order=True`.
# the pending task outputs, and/or op's internal output buffers (the latter can
# happen when `preserve_order=True`).
# Then we'll have no budget to pull blocks from the op.
self._reserved_for_op_outputs: Dict[PhysicalOperator, int] = {}
# Total shared resources.
Expand Down Expand Up @@ -486,21 +487,26 @@ def _should_unblock_streaming_output_backpressure(
return True
return False

def _op_outputs_reserved_remaining(self, op: PhysicalOperator) -> int:
def _get_op_outputs_usage_with_downstream(self, op: PhysicalOperator) -> int:
"""Get the outputs memory usage of the given operator, including the downstream
non-Map operators.
"""
# Outputs usage of the current operator.
outputs_usage = self._resource_manager._mem_op_outputs[op]
op_outputs_usage = self._resource_manager._mem_op_outputs[op]
# Also account the downstream non-Map operators' memory usage.
outputs_usage += sum(
op_outputs_usage += sum(
self._resource_manager.get_op_usage(next_op).object_store_memory
for next_op in self._get_downstream_non_map_ops(op)
)
return max(self._reserved_for_op_outputs[op] - outputs_usage, 0)
return op_outputs_usage

def max_task_output_bytes_to_read(self, op: PhysicalOperator) -> Optional[int]:
if op not in self._op_budgets:
return None
res = self._op_budgets[op].object_store_memory
res += self._op_outputs_reserved_remaining(op)
# Add the remaining of `_reserved_for_op_outputs`.
op_outputs_usage = self._get_op_outputs_usage_with_downstream(op)
res += max(self._reserved_for_op_outputs[op] - op_outputs_usage, 0)
assert res >= 0
if res == 0 and self._should_unblock_streaming_output_backpressure(op):
res = 1
Expand Down Expand Up @@ -550,20 +556,10 @@ def update_usages(self):
# Add the memory usage of the operator itself,
# excluding `_reserved_for_op_outputs`.
op_mem_usage += self._resource_manager._mem_op_internal[op]
op_mem_usage += max(
self._resource_manager._mem_op_outputs[op]
- self._reserved_for_op_outputs[op],
0,
)
# Also account the downstream non-Map operators' memory usage
# to the current Map operator.
# This is because we don't directly throttle non-Map operators.
# So if they are using too much memory, we should throttle their
# upstream Map operator.
op_mem_usage += sum(
self._resource_manager.get_op_usage(next_op).object_store_memory
for next_op in self._get_downstream_non_map_ops(op)
)
# Add the portion of op outputs usage that has
# exceeded `_reserved_for_op_outputs`.
op_outputs_usage = self._get_op_outputs_usage_with_downstream(op)
op_mem_usage += max(op_outputs_usage - self._reserved_for_op_outputs[op], 0)
op_usage = copy.deepcopy(self._resource_manager.get_op_usage(op))
op_usage.object_store_memory = op_mem_usage
op_reserved = self._op_reserved[op]
Expand Down
30 changes: 15 additions & 15 deletions python/ray/data/tests/test_resource_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -211,12 +211,12 @@ def test_update_usage(self):
assert op_usage.object_store_memory == expected_mem
if op != o1:
assert (
resource_manager._mem_pending_task_outputs[op]
== mock_pending_task_outputs[op]
resource_manager._mem_op_internal[op]
== mock_pending_task_outputs[op] + mock_internal_outqueue[op]
)
assert (
resource_manager._mem_op_outputs[op]
== expected_mem - mock_pending_task_outputs[op]
== expected_mem - resource_manager._mem_op_internal[op]
)
global_cpu += mock_cpu[op]
global_mem += expected_mem
Expand Down Expand Up @@ -319,7 +319,7 @@ def test_basic(self, restore_data_context):
o4 = LimitOperator(1, o3)

op_usages = {op: ExecutionResources.zero() for op in [o1, o2, o3, o4]}
pending_task_outputs_usages = {op: 0 for op in [o1, o2, o3, o4]}
op_internal_usage = {op: 0 for op in [o1, o2, o3, o4]}
op_outputs_usages = {op: 0 for op in [o1, o2, o3, o4]}

topo, _ = build_streaming_topology(o4, ExecutionOptions())
Expand All @@ -332,7 +332,7 @@ def mock_get_global_limits():

resource_manager = ResourceManager(topo, ExecutionOptions())
resource_manager.get_op_usage = MagicMock(side_effect=lambda op: op_usages[op])
resource_manager._mem_pending_task_outputs = pending_task_outputs_usages
resource_manager._mem_op_internal = op_internal_usage
resource_manager._mem_op_outputs = op_outputs_usages

resource_manager.get_global_limits = MagicMock(
Expand Down Expand Up @@ -378,10 +378,10 @@ def mock_get_global_limits():

# Test when each operator uses some resources.
op_usages[o2] = ExecutionResources(6, 0, 500)
pending_task_outputs_usages[o2] = 400
op_internal_usage[o2] = 400
op_outputs_usages[o2] = 100
op_usages[o3] = ExecutionResources(2, 0, 125)
pending_task_outputs_usages[o3] = 30
op_internal_usage[o3] = 30
op_outputs_usages[o3] = 25
op_usages[o4] = ExecutionResources(0, 0, 50)

Expand All @@ -393,20 +393,20 @@ def mock_get_global_limits():
# +-----+------------------+------------------+--------------+
# | op2 | 125/0 | 100/25 | 400-125=275 |
# +-----+------------------+------------------+--------------+
# | op3 | (30+50)/45 | 25/100 | 0 |
# | op3 | 30/95 | (25+50)/50 | 0 |
# +-----+------------------+------------------+--------------+
# remaining shared = 1000/2 - 275 = 225
# Test budgets.
# memory_budget[o2] = 0 + 225/2 = 112.5
assert allocator._op_budgets[o2] == ExecutionResources(3, float("inf"), 112.5)
# memory_budget[o3] = 45 + 225/2 = 157.5
assert allocator._op_budgets[o3] == ExecutionResources(5, float("inf"), 157.5)
# memory_budget[o3] = 95 + 225/2 = 207.5
assert allocator._op_budgets[o3] == ExecutionResources(5, float("inf"), 207.5)
# Test can_submit_new_task and max_task_output_bytes_to_read.
assert allocator.can_submit_new_task(o2)
assert allocator.can_submit_new_task(o3)
# max_task_output_bytes_to_read(o2) = 112.5 + 25 = 137.5
assert allocator.max_task_output_bytes_to_read(o2) == 137.5
# max_task_output_bytes_to_read(o3) = 157.5 + 100 = 257.5
# max_task_output_bytes_to_read(o3) = 207.5 + 50 = 257.5
assert allocator.max_task_output_bytes_to_read(o3) == 257.5

# Test global_limits updated.
Expand All @@ -419,7 +419,7 @@ def mock_get_global_limits():
# +-----+------------------+------------------+--------------+
# | op2 | 100/0 | 100/0 | 400-100=300 |
# +-----+------------------+------------------+--------------+
# | op3 | (30+50)/20 | 25/75 | 0 |
# | op3 | 30/70 | (25+50)/25 | 0 |
# +-----+------------------+------------------+--------------+
# remaining shared = 800/2 - 300 = 100
# Test reserved resources for o2 and o3.
Expand All @@ -433,14 +433,14 @@ def mock_get_global_limits():
# Test budgets.
# memory_budget[o2] = 0 + 100/2 = 50
assert allocator._op_budgets[o2] == ExecutionResources(1.5, float("inf"), 50)
# memory_budget[o3] = 20 + 100/2 = 70
assert allocator._op_budgets[o3] == ExecutionResources(2.5, float("inf"), 70)
# memory_budget[o3] = 70 + 100/2 = 120
assert allocator._op_budgets[o3] == ExecutionResources(2.5, float("inf"), 120)
# Test can_submit_new_task and max_task_output_bytes_to_read.
assert allocator.can_submit_new_task(o2)
assert allocator.can_submit_new_task(o3)
# max_task_output_bytes_to_read(o2) = 50 + 0 = 50
assert allocator.max_task_output_bytes_to_read(o2) == 50
# max_task_output_bytes_to_read(o3) = 70 + 75 = 145
# max_task_output_bytes_to_read(o3) = 120 + 25 = 145
assert allocator.max_task_output_bytes_to_read(o3) == 145

def test_reserve_incremental_resource_usage(self, restore_data_context):
Expand Down

0 comments on commit 5f99c8e

Please sign in to comment.