diff --git a/python/ray/data/_internal/execution/resource_manager.py b/python/ray/data/_internal/execution/resource_manager.py index e4efbed42bdc..431988ad6d00 100644 --- a/python/ray/data/_internal/execution/resource_manager.py +++ b/python/ray/data/_internal/execution/resource_manager.py @@ -96,6 +96,7 @@ def _estimate_object_store_memory(self, op, state) -> int: # Op's external output buffer. mem_op_outputs = state.outqueue_memory_usage() + # Input buffers of the downstream operators. for next_op in op.output_dependencies: mem_op_outputs += ( next_op.metrics.obj_store_mem_internal_inqueue @@ -270,7 +271,7 @@ class ReservationOpResourceAllocator(OpResourceAllocator): num_map_ops` resources, half of which is reserved only for the operator outputs, excluding pending task outputs. 3. Non-reserved resources are shared among all operators. - 3. In each scheduling iteration, each map operator will get "remaining of their own + 4. In each scheduling iteration, each map operator will get "remaining of their own reserved resources" + "remaining of shared resources / num_map_ops" resources. The `reservation_ratio` is set to 50% by default. Users can tune this value to @@ -353,8 +354,8 @@ def __init__(self, resource_manager: ResourceManager, reservation_ratio: float): # i.e., `RessourceManager._mem_op_outputs`. # # Note, if we don't reserve memory for op outputs, all the budget may be used by - # the pending task outputs, and/or op' internal output buffers (the latter can - # happen when `preserve_order=True`. + # the pending task outputs, and/or op's internal output buffers (the latter can + # happen when `preserve_order=True`). # Then we'll have no budget to pull blocks from the op. self._reserved_for_op_outputs: Dict[PhysicalOperator, int] = {} # Total shared resources. @@ -486,21 +487,26 @@ def _should_unblock_streaming_output_backpressure( return True return False - def _op_outputs_reserved_remaining(self, op: PhysicalOperator) -> int: + def _get_op_outputs_usage_with_downstream(self, op: PhysicalOperator) -> int: + """Get the outputs memory usage of the given operator, including the downstream + non-Map operators. + """ # Outputs usage of the current operator. - outputs_usage = self._resource_manager._mem_op_outputs[op] + op_outputs_usage = self._resource_manager._mem_op_outputs[op] # Also account the downstream non-Map operators' memory usage. - outputs_usage += sum( + op_outputs_usage += sum( self._resource_manager.get_op_usage(next_op).object_store_memory for next_op in self._get_downstream_non_map_ops(op) ) - return max(self._reserved_for_op_outputs[op] - outputs_usage, 0) + return op_outputs_usage def max_task_output_bytes_to_read(self, op: PhysicalOperator) -> Optional[int]: if op not in self._op_budgets: return None res = self._op_budgets[op].object_store_memory - res += self._op_outputs_reserved_remaining(op) + # Add the remaining of `_reserved_for_op_outputs`. + op_outputs_usage = self._get_op_outputs_usage_with_downstream(op) + res += max(self._reserved_for_op_outputs[op] - op_outputs_usage, 0) assert res >= 0 if res == 0 and self._should_unblock_streaming_output_backpressure(op): res = 1 @@ -550,20 +556,10 @@ def update_usages(self): # Add the memory usage of the operator itself, # excluding `_reserved_for_op_outputs`. op_mem_usage += self._resource_manager._mem_op_internal[op] - op_mem_usage += max( - self._resource_manager._mem_op_outputs[op] - - self._reserved_for_op_outputs[op], - 0, - ) - # Also account the downstream non-Map operators' memory usage - # to the current Map operator. - # This is because we don't directly throttle non-Map operators. - # So if they are using too much memory, we should throttle their - # upstream Map operator. - op_mem_usage += sum( - self._resource_manager.get_op_usage(next_op).object_store_memory - for next_op in self._get_downstream_non_map_ops(op) - ) + # Add the portion of op outputs usage that has + # exceeded `_reserved_for_op_outputs`. + op_outputs_usage = self._get_op_outputs_usage_with_downstream(op) + op_mem_usage += max(op_outputs_usage - self._reserved_for_op_outputs[op], 0) op_usage = copy.deepcopy(self._resource_manager.get_op_usage(op)) op_usage.object_store_memory = op_mem_usage op_reserved = self._op_reserved[op] diff --git a/python/ray/data/tests/test_resource_manager.py b/python/ray/data/tests/test_resource_manager.py index 657985a3b706..6502f3038ec7 100644 --- a/python/ray/data/tests/test_resource_manager.py +++ b/python/ray/data/tests/test_resource_manager.py @@ -211,12 +211,12 @@ def test_update_usage(self): assert op_usage.object_store_memory == expected_mem if op != o1: assert ( - resource_manager._mem_pending_task_outputs[op] - == mock_pending_task_outputs[op] + resource_manager._mem_op_internal[op] + == mock_pending_task_outputs[op] + mock_internal_outqueue[op] ) assert ( resource_manager._mem_op_outputs[op] - == expected_mem - mock_pending_task_outputs[op] + == expected_mem - resource_manager._mem_op_internal[op] ) global_cpu += mock_cpu[op] global_mem += expected_mem @@ -319,7 +319,7 @@ def test_basic(self, restore_data_context): o4 = LimitOperator(1, o3) op_usages = {op: ExecutionResources.zero() for op in [o1, o2, o3, o4]} - pending_task_outputs_usages = {op: 0 for op in [o1, o2, o3, o4]} + op_internal_usage = {op: 0 for op in [o1, o2, o3, o4]} op_outputs_usages = {op: 0 for op in [o1, o2, o3, o4]} topo, _ = build_streaming_topology(o4, ExecutionOptions()) @@ -332,7 +332,7 @@ def mock_get_global_limits(): resource_manager = ResourceManager(topo, ExecutionOptions()) resource_manager.get_op_usage = MagicMock(side_effect=lambda op: op_usages[op]) - resource_manager._mem_pending_task_outputs = pending_task_outputs_usages + resource_manager._mem_op_internal = op_internal_usage resource_manager._mem_op_outputs = op_outputs_usages resource_manager.get_global_limits = MagicMock( @@ -378,10 +378,10 @@ def mock_get_global_limits(): # Test when each operator uses some resources. op_usages[o2] = ExecutionResources(6, 0, 500) - pending_task_outputs_usages[o2] = 400 + op_internal_usage[o2] = 400 op_outputs_usages[o2] = 100 op_usages[o3] = ExecutionResources(2, 0, 125) - pending_task_outputs_usages[o3] = 30 + op_internal_usage[o3] = 30 op_outputs_usages[o3] = 25 op_usages[o4] = ExecutionResources(0, 0, 50) @@ -393,20 +393,20 @@ def mock_get_global_limits(): # +-----+------------------+------------------+--------------+ # | op2 | 125/0 | 100/25 | 400-125=275 | # +-----+------------------+------------------+--------------+ - # | op3 | (30+50)/45 | 25/100 | 0 | + # | op3 | 30/95 | (25+50)/50 | 0 | # +-----+------------------+------------------+--------------+ # remaining shared = 1000/2 - 275 = 225 # Test budgets. # memory_budget[o2] = 0 + 225/2 = 112.5 assert allocator._op_budgets[o2] == ExecutionResources(3, float("inf"), 112.5) - # memory_budget[o3] = 45 + 225/2 = 157.5 - assert allocator._op_budgets[o3] == ExecutionResources(5, float("inf"), 157.5) + # memory_budget[o3] = 95 + 225/2 = 207.5 + assert allocator._op_budgets[o3] == ExecutionResources(5, float("inf"), 207.5) # Test can_submit_new_task and max_task_output_bytes_to_read. assert allocator.can_submit_new_task(o2) assert allocator.can_submit_new_task(o3) # max_task_output_bytes_to_read(o2) = 112.5 + 25 = 137.5 assert allocator.max_task_output_bytes_to_read(o2) == 137.5 - # max_task_output_bytes_to_read(o3) = 157.5 + 100 = 257.5 + # max_task_output_bytes_to_read(o3) = 207.5 + 50 = 257.5 assert allocator.max_task_output_bytes_to_read(o3) == 257.5 # Test global_limits updated. @@ -419,7 +419,7 @@ def mock_get_global_limits(): # +-----+------------------+------------------+--------------+ # | op2 | 100/0 | 100/0 | 400-100=300 | # +-----+------------------+------------------+--------------+ - # | op3 | (30+50)/20 | 25/75 | 0 | + # | op3 | 30/70 | (25+50)/25 | 0 | # +-----+------------------+------------------+--------------+ # remaining shared = 800/2 - 300 = 100 # Test reserved resources for o2 and o3. @@ -433,14 +433,14 @@ def mock_get_global_limits(): # Test budgets. # memory_budget[o2] = 0 + 100/2 = 50 assert allocator._op_budgets[o2] == ExecutionResources(1.5, float("inf"), 50) - # memory_budget[o3] = 20 + 100/2 = 70 - assert allocator._op_budgets[o3] == ExecutionResources(2.5, float("inf"), 70) + # memory_budget[o3] = 70 + 100/2 = 120 + assert allocator._op_budgets[o3] == ExecutionResources(2.5, float("inf"), 120) # Test can_submit_new_task and max_task_output_bytes_to_read. assert allocator.can_submit_new_task(o2) assert allocator.can_submit_new_task(o3) # max_task_output_bytes_to_read(o2) = 50 + 0 = 50 assert allocator.max_task_output_bytes_to_read(o2) == 50 - # max_task_output_bytes_to_read(o3) = 70 + 75 = 145 + # max_task_output_bytes_to_read(o3) = 120 + 25 = 145 assert allocator.max_task_output_bytes_to_read(o3) == 145 def test_reserve_incremental_resource_usage(self, restore_data_context):