diff --git a/python/ray/data/BUILD b/python/ray/data/BUILD index e807c60e6bea..64e06211fcae 100644 --- a/python/ray/data/BUILD +++ b/python/ray/data/BUILD @@ -450,14 +450,6 @@ py_test( deps = ["//:ray_lib", ":conftest"], ) -py_test( - name = "test_runtime_metrics_scheduling", - size = "small", - srcs = ["tests/test_runtime_metrics_scheduling.py"], - tags = ["team:data", "exclusive"], - deps = ["//:ray_lib", ":conftest"], -) - py_test( name = "test_size_estimation", size = "medium", @@ -522,14 +514,6 @@ py_test( deps = ["//:ray_lib", ":conftest"], ) -py_test( - name = "test_streaming_backpressure_edge_case", - size = "medium", - srcs = ["tests/test_streaming_backpressure_edge_case.py"], - tags = ["team:data", "exclusive"], - deps = ["//:ray_lib", ":conftest"], -) - py_test( name = "test_transform_pyarrow", size = "small", @@ -561,3 +545,11 @@ py_test( tags = ["team:data", "exclusive"], deps = ["//:ray_lib", ":conftest"], ) + +py_test( + name = "test_backpressure_e2e", + size = "medium", + srcs = ["tests/test_backpressure_e2e.py"], + tags = ["team:data", "exclusive"], + deps = ["//:ray_lib", ":conftest"], +) diff --git a/python/ray/data/_internal/execution/backpressure_policy/__init__.py b/python/ray/data/_internal/execution/backpressure_policy/__init__.py index 57d52f96460f..a9d6ac177e97 100644 --- a/python/ray/data/_internal/execution/backpressure_policy/__init__.py +++ b/python/ray/data/_internal/execution/backpressure_policy/__init__.py @@ -3,15 +3,15 @@ import ray from .backpressure_policy import BackpressurePolicy from .concurrency_cap_backpressure_policy import ConcurrencyCapBackpressurePolicy -from .streaming_output_backpressure_policy import StreamingOutputBackpressurePolicy if TYPE_CHECKING: from ray.data._internal.execution.streaming_executor_state import Topology # Default enabled backpressure policies and its config key. # Use `DataContext.set_config` to config it. -# TODO(hchen): Enable StreamingOutputBackpressurePolicy by default. -ENABLED_BACKPRESSURE_POLICIES = [ConcurrencyCapBackpressurePolicy] +ENABLED_BACKPRESSURE_POLICIES = [ + ConcurrencyCapBackpressurePolicy, +] ENABLED_BACKPRESSURE_POLICIES_CONFIG_KEY = "backpressure_policies.enabled" @@ -27,7 +27,6 @@ def get_backpressure_policies(topology: "Topology"): __all__ = [ "BackpressurePolicy", "ConcurrencyCapBackpressurePolicy", - "StreamingOutputBackpressurePolicy", "ENABLED_BACKPRESSURE_POLICIES_CONFIG_KEY", "get_backpressure_policies", ] diff --git a/python/ray/data/_internal/execution/backpressure_policy/backpressure_policy.py b/python/ray/data/_internal/execution/backpressure_policy/backpressure_policy.py index 5f5e973f04fe..6577936e1dd6 100644 --- a/python/ray/data/_internal/execution/backpressure_policy/backpressure_policy.py +++ b/python/ray/data/_internal/execution/backpressure_policy/backpressure_policy.py @@ -1,11 +1,11 @@ from abc import ABC, abstractmethod -from typing import TYPE_CHECKING, Dict +from typing import TYPE_CHECKING if TYPE_CHECKING: from ray.data._internal.execution.interfaces.physical_operator import ( PhysicalOperator, ) - from ray.data._internal.execution.streaming_executor_state import OpState, Topology + from ray.data._internal.execution.streaming_executor_state import Topology class BackpressurePolicy(ABC): @@ -15,24 +15,6 @@ class BackpressurePolicy(ABC): def __init__(self, topology: "Topology"): ... - def calculate_max_blocks_to_read_per_op( - self, topology: "Topology" - ) -> Dict["OpState", int]: - """Determine how many blocks of data we can read from each operator. - The `DataOpTask`s of the operators will stop reading blocks when the limit is - reached. Then the execution of these tasks will be paused when the streaming - generator backpressure threshold is reached. - Used in `streaming_executor_state.py::process_completed_tasks()`. - - Returns: A dict mapping from each operator's OpState to the desired number of - blocks to read. For operators that are not in the dict, all available blocks - will be read. - - Note: Only one backpressure policy that implements this method can be enabled - at a time. - """ - return {} - def can_add_input(self, op: "PhysicalOperator") -> bool: """Determine if we can add a new input to the operator. If returns False, the operator will be backpressured and will not be able to run new tasks. diff --git a/python/ray/data/_internal/execution/backpressure_policy/streaming_output_backpressure_policy.py b/python/ray/data/_internal/execution/backpressure_policy/streaming_output_backpressure_policy.py deleted file mode 100644 index 156c65c0f1a1..000000000000 --- a/python/ray/data/_internal/execution/backpressure_policy/streaming_output_backpressure_policy.py +++ /dev/null @@ -1,143 +0,0 @@ -import time -from collections import defaultdict -from typing import TYPE_CHECKING, Dict, Tuple - -import ray -from .backpressure_policy import BackpressurePolicy -from ray.data._internal.dataset_logger import DatasetLogger - -if TYPE_CHECKING: - from ray.data._internal.execution.interfaces import PhysicalOperator - from ray.data._internal.execution.streaming_executor_state import OpState, Topology - - -logger = DatasetLogger(__name__) - - -class StreamingOutputBackpressurePolicy(BackpressurePolicy): - """A backpressure policy that throttles the streaming outputs of the `DataOpTask`s. - - The are 2 levels of configs to control the behavior: - - At the Ray Core level, we use - `MAX_BLOCKS_IN_GENERATOR_BUFFER` to limit the number of blocks buffered in - the streaming generator of each OpDataTask. When it's reached, the task will - be blocked at `yield` until the caller reads another `ObjectRef. - - At the Ray Data level, we use - `MAX_BLOCKS_IN_OP_OUTPUT_QUEUE` to limit the number of blocks buffered in the - output queue of each operator. When it's reached, we'll stop reading from the - streaming generators of the op's tasks, and thus trigger backpressure at the - Ray Core level. - - Thus, total number of buffered blocks for each operator can be - `MAX_BLOCKS_IN_GENERATOR_BUFFER * num_running_tasks + - MAX_BLOCKS_IN_OP_OUTPUT_QUEUE`. - """ - - # The max number of blocks that can be buffered at the streaming generator - # of each `DataOpTask`. - MAX_BLOCKS_IN_GENERATOR_BUFFER = 4 - MAX_BLOCKS_IN_GENERATOR_BUFFER_CONFIG_KEY = ( - "backpressure_policies.streaming_output.max_blocks_in_generator_buffer" - ) - # The max number of blocks that can be buffered at the operator output queue - # (`OpState.outqueue`). - MAX_BLOCKS_IN_OP_OUTPUT_QUEUE = 20 - MAX_BLOCKS_IN_OP_OUTPUT_QUEUE_CONFIG_KEY = ( - "backpressure_policies.streaming_output.max_blocks_in_op_output_queue" - ) - - # If an operator has active tasks but no outputs for at least this time, - # we'll consider it as idle and temporarily unblock backpressure for its upstream. - MAX_OUTPUT_IDLE_SECONDS = 10 - - def __init__(self, topology: "Topology"): - data_context = ray.data.DataContext.get_current() - data_context._max_num_blocks_in_streaming_gen_buffer = data_context.get_config( - self.MAX_BLOCKS_IN_GENERATOR_BUFFER_CONFIG_KEY, - self.MAX_BLOCKS_IN_GENERATOR_BUFFER, - ) - assert data_context._max_num_blocks_in_streaming_gen_buffer > 0 - - self._max_num_blocks_in_op_output_queue = data_context.get_config( - self.MAX_BLOCKS_IN_OP_OUTPUT_QUEUE_CONFIG_KEY, - self.MAX_BLOCKS_IN_OP_OUTPUT_QUEUE, - ) - assert self._max_num_blocks_in_op_output_queue > 0 - - # Latest number of outputs and the last time when the number changed - # for each op. - self._last_num_outputs_and_time: Dict[ - "PhysicalOperator", Tuple[int, float] - ] = defaultdict(lambda: (0, time.time())) - self._warning_printed = False - - def calculate_max_blocks_to_read_per_op( - self, topology: "Topology" - ) -> Dict["OpState", int]: - max_blocks_to_read_per_op: Dict["OpState", int] = {} - - # Indicates if the immediate downstream operator is idle. - downstream_idle = False - - for op, state in reversed(topology.items()): - max_blocks_to_read_per_op[state] = ( - self._max_num_blocks_in_op_output_queue - state.outqueue_num_blocks() - ) - - if downstream_idle: - max_blocks_to_read_per_op[state] = max( - max_blocks_to_read_per_op[state], - 1, - ) - - # An operator is considered idle if either of the following is true: - # - It has no active tasks. - # - This can happen when all resources are used by upstream operators. - # - It has active tasks, but no outputs for at least - # `MAX_OUTPUT_IDLE_SECONDS`. - # - This can happen when non-Data code preempted cluster resources, and - # - some of the active tasks don't actually have enough resources to run. - # - # If the operator is idle, we'll temporarily unblock backpressure by - # allowing reading at least one block from its upstream - # to avoid deadlock. - # NOTE, these 2 conditions don't necessarily mean deadlock. - # The first case can also happen when the upstream operator hasn't outputted - # any blocks yet. While the second case can also happen when the task is - # expected to output data slowly. - # The false postive cases are fine as we only allow reading one block - # each time. - downstream_idle = False - if op.num_active_tasks() == 0: - downstream_idle = True - else: - cur_num_outputs = state.op.metrics.num_task_outputs_generated - cur_time = time.time() - last_num_outputs, last_time = self._last_num_outputs_and_time[state.op] - if cur_num_outputs > last_num_outputs: - self._last_num_outputs_and_time[state.op] = ( - cur_num_outputs, - cur_time, - ) - else: - if cur_time - last_time > self.MAX_OUTPUT_IDLE_SECONDS: - downstream_idle = True - self._print_warning(state.op, cur_time - last_time) - return max_blocks_to_read_per_op - - def _print_warning(self, op: "PhysicalOperator", idle_time: float): - if self._warning_printed: - return - self._warning_printed = True - msg = ( - f"Operator {op} is running but has no outputs for {idle_time} seconds." - " Execution may be slower than expected.\n" - "Ignore this warning if your UDF is expected to be slow." - " Otherwise, this can happen when there are fewer cluster resources" - " available to Ray Data than expected." - " If you have non-Data tasks or actors running in the cluster, exclude" - " their resources from Ray Data with" - " `DataContext.get_current().execution_options.exclude_resources`." - " This message will only print once." - ) - logger.get_logger(log_to_stdout=False).warning(msg) diff --git a/python/ray/data/_internal/execution/interfaces/execution_options.py b/python/ray/data/_internal/execution/interfaces/execution_options.py index fa7cbaa1da4d..9b5a03ad1309 100644 --- a/python/ray/data/_internal/execution/interfaces/execution_options.py +++ b/python/ray/data/_internal/execution/interfaces/execution_options.py @@ -33,6 +33,18 @@ def inf(cls) -> "ExecutionResources": """Returns an ExecutionResources object with infinite resources.""" return ExecutionResources(float("inf"), float("inf"), float("inf")) + def is_zero(self) -> bool: + """Returns True if all resources are zero.""" + return self.cpu == 0.0 and self.gpu == 0.0 and self.object_store_memory == 0 + + def is_non_negative(self) -> bool: + """Returns True if all resources are non-negative.""" + return ( + (self.cpu is None or self.cpu >= 0) + and (self.gpu is None or self.gpu >= 0) + and (self.object_store_memory is None or self.object_store_memory >= 0) + ) + def object_store_memory_str(self) -> str: """Returns a human-readable string for the object store memory field.""" if self.object_store_memory is None: @@ -92,13 +104,24 @@ def max(self, other: "ExecutionResources") -> "ExecutionResources": def min(self, other: "ExecutionResources") -> "ExecutionResources": """Returns the minimum for each resource type.""" + cpu1 = self.cpu if self.cpu is not None else float("inf") + cpu2 = other.cpu if other.cpu is not None else float("inf") + gpu1 = self.gpu if self.gpu is not None else float("inf") + gpu2 = other.gpu if other.gpu is not None else float("inf") + object_store_memory1 = ( + self.object_store_memory + if self.object_store_memory is not None + else float("inf") + ) + object_store_memory2 = ( + other.object_store_memory + if other.object_store_memory is not None + else float("inf") + ) return ExecutionResources( - cpu=min(self.cpu or float("inf"), other.cpu or float("inf")), - gpu=min(self.gpu or float("inf"), other.gpu or float("inf")), - object_store_memory=min( - self.object_store_memory or float("inf"), - other.object_store_memory or float("inf"), - ), + cpu=min(cpu1, cpu2), + gpu=min(gpu1, gpu2), + object_store_memory=min(object_store_memory1, object_store_memory2), ) def satisfies_limit(self, limit: "ExecutionResources") -> bool: diff --git a/python/ray/data/_internal/execution/interfaces/physical_operator.py b/python/ray/data/_internal/execution/interfaces/physical_operator.py index 558b4b8ea4ec..00591c5220b7 100644 --- a/python/ray/data/_internal/execution/interfaces/physical_operator.py +++ b/python/ray/data/_internal/execution/interfaces/physical_operator.py @@ -65,16 +65,16 @@ def __init__( def get_waitable(self) -> ObjectRefGenerator: return self._streaming_gen - def on_data_ready(self, max_blocks_to_read: Optional[int]) -> int: + def on_data_ready(self, max_bytes_to_read: Optional[int]) -> int: """Callback when data is ready to be read from the streaming generator. Args: - max_blocks_to_read: Max number of blocks to read. If None, all available + max_bytes_to_read: Max bytes of blocks to read. If None, all available will be read. Returns: The number of blocks read. """ - num_blocks_read = 0 - while max_blocks_to_read is None or num_blocks_read < max_blocks_to_read: + bytes_read = 0 + while max_bytes_to_read is None or bytes_read < max_bytes_to_read: try: block_ref = self._streaming_gen._next_sync(0) if block_ref.is_nil(): @@ -103,8 +103,8 @@ def on_data_ready(self, max_blocks_to_read: Optional[int]) -> int: self._output_ready_callback( RefBundle([(block_ref, meta)], owns_blocks=True) ) - num_blocks_read += 1 - return num_blocks_read + bytes_read += meta.size_bytes + return bytes_read class MetadataOpTask(OpTask): @@ -386,11 +386,16 @@ def base_resource_usage(self) -> ExecutionResources: """ return ExecutionResources() - def incremental_resource_usage(self) -> ExecutionResources: + def incremental_resource_usage( + self, consider_autoscaling=True + ) -> ExecutionResources: """Returns the incremental resources required for processing another input. For example, an operator that launches a task per input could return ExecutionResources(cpu=1) as its incremental usage. + + Args: + consider_autoscaling: Whether to consider the possibility of autoscaling. """ return ExecutionResources() diff --git a/python/ray/data/_internal/execution/operators/actor_pool_map_operator.py b/python/ray/data/_internal/execution/operators/actor_pool_map_operator.py index 20347694e075..aee5a04f225a 100644 --- a/python/ray/data/_internal/execution/operators/actor_pool_map_operator.py +++ b/python/ray/data/_internal/execution/operators/actor_pool_map_operator.py @@ -326,10 +326,12 @@ def current_processor_usage(self) -> ExecutionResources: gpu=self._ray_remote_args.get("num_gpus", 0) * num_active_workers, ) - def incremental_resource_usage(self) -> ExecutionResources: + def incremental_resource_usage( + self, consider_autoscaling=True + ) -> ExecutionResources: # We would only have nonzero incremental CPU/GPU resources if a new task would # require scale-up to run. - if self._autoscaling_policy.should_scale_up( + if consider_autoscaling and self._autoscaling_policy.should_scale_up( num_total_workers=self._actor_pool.num_total_actors(), num_running_workers=self._actor_pool.num_running_actors(), ): @@ -345,7 +347,8 @@ def incremental_resource_usage(self) -> ExecutionResources: return ExecutionResources( cpu=num_cpus, gpu=num_gpus, - object_store_memory=self._metrics.average_bytes_outputs_per_task, + object_store_memory=self._metrics.obj_store_mem_max_pending_output_per_task + or 0, ) def _extra_metrics(self) -> Dict[str, Any]: diff --git a/python/ray/data/_internal/execution/operators/map_operator.py b/python/ray/data/_internal/execution/operators/map_operator.py index ba616a69cec4..c28681f91add 100644 --- a/python/ray/data/_internal/execution/operators/map_operator.py +++ b/python/ray/data/_internal/execution/operators/map_operator.py @@ -396,7 +396,9 @@ def base_resource_usage(self) -> ExecutionResources: raise NotImplementedError @abstractmethod - def incremental_resource_usage(self) -> ExecutionResources: + def incremental_resource_usage( + self, consider_autoscaling=True + ) -> ExecutionResources: raise NotImplementedError diff --git a/python/ray/data/_internal/execution/operators/task_pool_map_operator.py b/python/ray/data/_internal/execution/operators/task_pool_map_operator.py index 492ee8a3bdc3..2d84dd1bc111 100644 --- a/python/ray/data/_internal/execution/operators/task_pool_map_operator.py +++ b/python/ray/data/_internal/execution/operators/task_pool_map_operator.py @@ -108,11 +108,14 @@ def current_processor_usage(self) -> ExecutionResources: gpu=self._ray_remote_args.get("num_gpus", 0) * num_active_workers, ) - def incremental_resource_usage(self) -> ExecutionResources: + def incremental_resource_usage( + self, consider_autoscaling=True + ) -> ExecutionResources: return ExecutionResources( cpu=self._ray_remote_args.get("num_cpus", 0), gpu=self._ray_remote_args.get("num_gpus", 0), - object_store_memory=self._metrics.average_bytes_outputs_per_task, + object_store_memory=self._metrics.obj_store_mem_max_pending_output_per_task + or 0, ) def get_concurrency(self) -> Optional[int]: diff --git a/python/ray/data/_internal/execution/resource_manager.py b/python/ray/data/_internal/execution/resource_manager.py index 4481817c74a9..b59076845622 100644 --- a/python/ray/data/_internal/execution/resource_manager.py +++ b/python/ray/data/_internal/execution/resource_manager.py @@ -1,23 +1,34 @@ +import copy +import os import time from abc import ABC, abstractmethod -from typing import TYPE_CHECKING, Dict, Optional +from collections import defaultdict +from typing import TYPE_CHECKING, Dict, List, Optional import ray +from ray.data._internal.dataset_logger import DatasetLogger from ray.data._internal.execution.interfaces.execution_options import ( ExecutionOptions, ExecutionResources, ) from ray.data._internal.execution.interfaces.physical_operator import PhysicalOperator +from ray.data._internal.execution.operators.actor_pool_map_operator import ( + ActorPoolMapOperator, +) from ray.data._internal.execution.operators.input_data_buffer import InputDataBuffer from ray.data._internal.execution.operators.limit_operator import LimitOperator from ray.data._internal.execution.operators.map_operator import MapOperator from ray.data._internal.execution.operators.output_splitter import OutputSplitter +from ray.data._internal.execution.util import memory_string from ray.data.context import DataContext if TYPE_CHECKING: from ray.data._internal.execution.streaming_executor_state import Topology +logger = DatasetLogger(__name__) + + class ResourceManager: """A class that manages the resource usage of a streaming executor.""" @@ -26,7 +37,7 @@ class ResourceManager: # The fraction of the object store capacity that will be used as the default object # store memory limit for the streaming executor. - DEFAULT_OBJECT_STORE_MEMORY_LIMIT_FRACTION = 0.25 + DEFAULT_OBJECT_STORE_MEMORY_LIMIT_FRACTION = 0.5 # Memory accounting is accurate only for these operators. # We'll enable memory reservation if a dataset only contains these operators. @@ -44,11 +55,18 @@ def __init__(self, topology: "Topology", options: ExecutionOptions): self._global_limits_last_update_time = 0 self._global_usage = ExecutionResources.zero() self._op_usages: Dict[PhysicalOperator, ExecutionResources] = {} + # Object store memory usage of the pending task outputs. + self._mem_pending_task_outputs: Dict[PhysicalOperator, int] = defaultdict(int) + # Object store memory usage of the internal/external output buffers, and + # the output buffers of the output dependency operators. + self._mem_op_outputs: Dict[PhysicalOperator, int] = defaultdict(int) + # Whether to print debug information. + self._debug = os.environ.get("RAY_DATA_DEBUG_RESOURCE_MANAGER", "0") == "1" self._downstream_fraction: Dict[PhysicalOperator, float] = {} self._downstream_object_store_memory: Dict[PhysicalOperator, int] = {} - self._op_resource_limiter: Optional["OpResourceLimiter"] = None + self._op_resource_allocator: Optional["OpResourceAllocator"] = None ctx = DataContext.get_current() if ctx.op_resource_reservation_enabled: @@ -58,10 +76,33 @@ def __init__(self, topology: "Topology", options: ExecutionOptions): should_enable = False break if should_enable: - self._op_resource_limiter = ReservationOpResourceLimiter( + self._op_resource_allocator = ReservationOpResourceAllocator( self, ctx.op_resource_reservation_ratio ) + def _estimate_object_store_memory(self, op, state) -> int: + # Don't count input refs towards dynamic memory usage, as they have been + # pre-created already outside this execution. + if isinstance(op, InputDataBuffer): + return 0 + + pending_task_outputs = op.metrics.obj_store_mem_pending_task_outputs or 0 + + output_buffers = op.metrics.obj_store_mem_internal_outqueue + output_buffers += state.outqueue_memory_usage() + + next_op_input_buffers = 0 + for next_op in op.output_dependencies: + next_op_input_buffers += ( + next_op.metrics.obj_store_mem_internal_inqueue + + next_op.metrics.obj_store_mem_pending_task_inputs + ) + + self._mem_pending_task_outputs[op] = pending_task_outputs + self._mem_op_outputs[op] = output_buffers + next_op_input_buffers + + return pending_task_outputs + output_buffers + next_op_input_buffers + def update_usages(self): """Recalculate resource usages.""" # TODO(hchen): This method will be called frequently during the execution loop. @@ -79,7 +120,7 @@ def update_usages(self): # Update `self._op_usages`. op_usage = op.current_processor_usage() assert not op_usage.object_store_memory - op_usage.object_store_memory = _estimate_object_store_memory(op, state) + op_usage.object_store_memory = self._estimate_object_store_memory(op, state) self._op_usages[op] = op_usage # Update `self._global_usage`. self._global_usage = self._global_usage.add(op_usage) @@ -92,8 +133,8 @@ def update_usages(self): op ] = self._global_usage.object_store_memory - if self._op_resource_limiter is not None: - self._op_resource_limiter.update_usages() + if self._op_resource_allocator is not None: + self._op_resource_allocator.update_usages() def get_global_usage(self) -> ExecutionResources: """Return the global resource usage at the current time.""" @@ -141,6 +182,25 @@ def get_op_usage(self, op: PhysicalOperator) -> ExecutionResources: """Return the resource usage of the given operator at the current time.""" return self._op_usages[op] + def get_op_usage_str(self, op: PhysicalOperator) -> str: + """Return a human-readable string representation of the resource usage of + the given operator.""" + usage_str = f"cpu: {self._op_usages[op].cpu:.1f}" + if self._op_usages[op].gpu: + usage_str += f", gpu: {self._op_usages[op].gpu:.1f}" + usage_str += f", objects: {self._op_usages[op].object_store_memory_str()}" + if self._debug: + usage_str += ( + f" (task_pending: {memory_string(self._mem_pending_task_outputs[op])}, " # noqa + f"op_outputs: {memory_string(self._mem_op_outputs[op])})" + ) + if ( + self.op_resource_allocator_enabled() + and op in self._op_resource_allocator._op_budgets + ): + usage_str += f", budget: {self._op_resource_allocator._op_budgets[op]}" + return usage_str + def get_downstream_fraction(self, op: PhysicalOperator) -> float: """Return the downstream fraction of the given operator.""" return self._downstream_fraction[op] @@ -149,42 +209,23 @@ def get_downstream_object_store_memory(self, op: PhysicalOperator) -> int: """Return the downstream object store memory usage of the given operator.""" return self._downstream_object_store_memory[op] - def op_resource_limiter_enabled(self) -> bool: - """Return whether OpResourceLimiter is enabled.""" - return self._op_resource_limiter is not None + def op_resource_allocator_enabled(self) -> bool: + """Return whether OpResourceAllocator is enabled.""" + return self._op_resource_allocator is not None - def get_op_limits(self, op: PhysicalOperator) -> ExecutionResources: - """Get the limit of resources that the given op can use at the current time. - - This method can only be called when `op_resource_limiter_enabled` returns True. - """ - assert self._op_resource_limiter is not None - return self._op_resource_limiter.get_op_limits(op) - - -def _estimate_object_store_memory(op, state) -> int: - # Don't count input refs towards dynamic memory usage, as they have been - # pre-created already outside this execution. - if isinstance(op, InputDataBuffer): - return 0 - - object_store_memory = op.metrics.obj_store_mem_internal_outqueue - if op.metrics.obj_store_mem_pending_task_outputs is not None: - object_store_memory += op.metrics.obj_store_mem_pending_task_outputs - object_store_memory += state.outqueue_memory_usage() - for next_op in op.output_dependencies: - object_store_memory += ( - next_op.metrics.obj_store_mem_internal_inqueue - + next_op.metrics.obj_store_mem_pending_task_inputs - ) - return object_store_memory + @property + def op_resource_allocator(self) -> "OpResourceAllocator": + """Return the OpResourceAllocator.""" + assert self._op_resource_allocator is not None + return self._op_resource_allocator -class OpResourceLimiter(ABC): - """Interface for limiting resources for each operator. +class OpResourceAllocator(ABC): + """An interface for dynamic operator resource allocation. - This interface allows limit the resources that each operator can use, in each - scheduling iteration. + This interface allows dynamically allocating available resources to each operator, + limiting how many tasks each operator can submit, and how much data each operator + can read from its running tasks. """ def __init__(self, resource_manager: ResourceManager): @@ -196,13 +237,19 @@ def update_usages(self) -> ExecutionResources: ... @abstractmethod - def get_op_limits(self, op: PhysicalOperator) -> ExecutionResources: - """Get the limit of resources that the given op can use at the current time.""" + def can_submit_new_task(self, op: PhysicalOperator) -> bool: + """Return whether the given operator can submit a new task.""" ... + @abstractmethod + def max_task_output_bytes_to_read(self, op: PhysicalOperator) -> Optional[int]: + """Return the maximum bytes of pending task outputs can be read for + the given operator. None means no limit.""" + ... -class ReservationOpResourceLimiter(OpResourceLimiter): - """An OpResourceLimiter implementation that reserves resources for each operator. + +class ReservationOpResourceAllocator(OpResourceAllocator): + """An OpResourceAllocator implementation that reserves resources for each operator. This class reserves memory and CPU resources for map operators, and consider runtime resource usages to limit the resources that each operator can use. @@ -210,7 +257,9 @@ class ReservationOpResourceLimiter(OpResourceLimiter): It works in the following way: 1. Currently we only limit map operators. Non-map operators get unlimited resources. 2. For each map operator, we reserve `reservation_ratio * global_resources / - num_map_ops` resources. The remaining are shared among all map operators. + num_map_ops` resources, half of which is reserved only for the operator outputs, + excluding pending task outputs. + 3. Non-reserved resources are shared among all operators. 3. In each scheduling iteration, each map operator will get "remaining of their own reserved resources" + "remaining of shared resources / num_map_ops" resources. @@ -220,81 +269,307 @@ class ReservationOpResourceLimiter(OpResourceLimiter): worse performance. And vice versa. """ + class IdleDetector: + """Utility class for detecting idle operators. + + Note, stalling can happen when there are less resources than Data executor + expects. E.g., when some resources are preempted by non-Data code, see + `test_no_deadlock_on_resource_contention` as an example. + + This class is used to detect potential stalling and allow the execution + to make progress. + """ + + # The interval to detect idle operators. + # When downstream is idle, we'll allow reading at least one task output + # per this interval, + DETECTION_INTERVAL_S = 1.0 + # Print a warning if an operator is idle for this time. + WARN_ON_IDLE_TIME_S = 60.0 + # Whether a warning has been printed. + _warn_printed = False + + def __init__(self): + # per-op fields + self.last_num_outputs = defaultdict(int) + self.last_output_time = defaultdict(lambda: time.time()) + self.last_detection_time = defaultdict(lambda: time.time()) + + def detect_idle(self, op: PhysicalOperator): + cur_time = time.time() + if cur_time - self.last_detection_time[op] > self.DETECTION_INTERVAL_S: + cur_num_outputs = op.metrics.num_task_outputs_generated + if cur_num_outputs > self.last_num_outputs[op]: + self.last_num_outputs[op] = cur_num_outputs + self.last_output_time[op] = cur_time + self.last_detection_time[op] = cur_time + else: + self.last_detection_time[op] = cur_time + self.print_warning_if_idle_for_too_long( + op, cur_time - self.last_output_time[op] + ) + return True + return False + + @classmethod + def print_warning_if_idle_for_too_long( + cls, op: PhysicalOperator, idle_time: float + ): + """Print a warning if an operator is idle for too long.""" + if idle_time < cls.WARN_ON_IDLE_TIME_S or cls._warn_printed: + return + cls._warn_printed = True + msg = ( + f"Operator {op} is running but has no outputs for {idle_time} seconds." + " Execution may be slower than expected.\n" + "Ignore this warning if your UDF is expected to be slow." + " Otherwise, this can happen when there are fewer cluster resources" + " available to Ray Data than expected." + " If you have non-Data tasks or actors running in the cluster, exclude" + " their resources from Ray Data with" + " `DataContext.get_current().execution_options.exclude_resources`." + " This message will only print once." + ) + logger.get_logger(log_to_stdout=True).warning(msg) + def __init__(self, resource_manager: ResourceManager, reservation_ratio: float): super().__init__(resource_manager) self._reservation_ratio = reservation_ratio assert 0.0 <= self._reservation_ratio <= 1.0 - # We only limit map operators. - self._eligible_ops = [ - op for op in self._resource_manager._topology if isinstance(op, MapOperator) - ] - # Per-op reserved resources. + # Per-op reserved resources, excluding `_reserved_for_op_outputs`. self._op_reserved: Dict[PhysicalOperator, ExecutionResources] = {} + # Memory reserved exclusively for the outputs of each operator. + # "Op outputs" refer to the operator's internal and external output buffers + # and next operator's input buffers, but not including the pending task outputs. + # + # Note, if we don't reserve memory for op outputs, all the budget may be used by + # by the pending task outputs. Then we'll have no budget to pull the outputs + # from the running tasks. + self._reserved_for_op_outputs: Dict[PhysicalOperator, int] = {} # Total shared resources. self._total_shared = ExecutionResources.zero() - # Resource limits for each operator. - self._op_limits: Dict[PhysicalOperator, ExecutionResources] = {} + # Resource budgets for each operator, excluding `_reserved_for_op_outputs`. + self._op_budgets: Dict[PhysicalOperator, ExecutionResources] = {} + # Whether each operator has reserved the minimum resources to run + # at least one task. + # This is used to avoid edge cases where the entire resource limits are not + # enough to run one task of each op. + # See `test_no_deadlock_on_small_cluster_resources` as an example. + self._reserved_min_resources: Dict[PhysicalOperator, bool] = {} + self._cached_global_limits = ExecutionResources.zero() + self._cached_num_eligible_ops = 0 + + self._idle_detector = self.IdleDetector() - def _on_global_limits_updated(self, global_limits: ExecutionResources): - if len(self._eligible_ops) == 0: + def _get_eligible_ops(self) -> List[PhysicalOperator]: + # Only consider map operators that are not completed. + return [ + op + for op in self._resource_manager._topology + if isinstance(op, MapOperator) and not op.completed() + ] + + def _update_reservation(self): + global_limits = self._resource_manager.get_global_limits() + eligible_ops = self._get_eligible_ops() + + if ( + global_limits == self._cached_global_limits + and len(eligible_ops) == self._cached_num_eligible_ops + ): return + self._cached_global_limits = global_limits + self._cached_num_eligible_ops = len(eligible_ops) + + self._op_reserved.clear() + self._reserved_for_op_outputs.clear() + self._reserved_min_resources.clear() + self._total_shared = copy.deepcopy(global_limits) - self._total_shared = global_limits.scale(1.0 - self._reservation_ratio) + if len(eligible_ops) == 0: + return + # Reserve `reservation_ratio * global_limits / num_ops` resources for each + # operator. default_reserved = global_limits.scale( - self._reservation_ratio / len(self._eligible_ops) + self._reservation_ratio / (len(eligible_ops)) ) - for op in self._eligible_ops: - # Make sure the reserved resources are at least to allow - # one task. - self._op_reserved[op] = default_reserved.max( - op.incremental_resource_usage() + for op in eligible_ops: + # Reserve at least half of the default reserved resources for the outputs. + # This makes sure that we will have enough budget to pull the outputs from + # the running tasks. + self._reserved_for_op_outputs[op] = max( + default_reserved.object_store_memory // 2, 1 + ) + # Calculate the minimum amount of resources to reserve. + # 1. Make sure the reserved resources are at least to allow one task. + min_reserved = copy.deepcopy( + op.incremental_resource_usage(consider_autoscaling=False) ) + # 2. To ensure that all GPUs are utilized, reserve enough resource budget + # to launch one task for each worker. + if ( + isinstance(op, ActorPoolMapOperator) + and op.base_resource_usage().gpu > 0 + ): + min_reserved.object_store_memory *= op._autoscaling_policy.min_workers + # Also include `reserved_for_op_outputs`. + min_reserved.object_store_memory += self._reserved_for_op_outputs[op] + # Total resources we want to reserve for this operator. + op_total_reserved = default_reserved.max(min_reserved) + if op_total_reserved.satisfies_limit(self._total_shared): + # If the remaining resources are enough to reserve `op_total_reserved`, + # subtract it from `self._total_shared` and reserve it for this op. + self._reserved_min_resources[op] = True + self._total_shared = self._total_shared.subtract(op_total_reserved) + self._op_reserved[op] = op_total_reserved + self._op_reserved[ + op + ].object_store_memory -= self._reserved_for_op_outputs[op] + else: + # If the remaining resources are not enough to reserve the minimum + # resources for this operator, we'll only reserve the minimum object + # store memory, but not the CPU and GPU resources. + # Because Ray Core doesn't allow CPU/GPU resources to be oversubscribed. + # Note, we reserve minimum resources first for the upstream + # ops. Downstream ops need to wait for upstream ops to finish + # and release resources. + self._reserved_min_resources[op] = False + self._op_reserved[op] = ExecutionResources( + 0, + 0, + min_reserved.object_store_memory + - self._reserved_for_op_outputs[op], + ) + self._total_shared = self._total_shared.subtract( + ExecutionResources(0, 0, min_reserved.object_store_memory) + ) + + self._total_shared = self._total_shared.max(ExecutionResources.zero()) + + def can_submit_new_task(self, op: PhysicalOperator) -> bool: + if op not in self._op_budgets: + return True + budget = self._op_budgets[op] + res = op.incremental_resource_usage().satisfies_limit(budget) + return res + + def _should_unblock_streaming_output_backpressure( + self, op: PhysicalOperator + ) -> bool: + # In some edge cases, the downstream operators may have no enough resources to + # launch tasks. Then we should temporarily unblock the streaming output + # backpressure by allowing reading at least 1 block. So the current operator + # can finish at least one task and yield resources to the downstream operators. + for next_op in op.output_dependencies: + if not self._reserved_min_resources[next_op]: + # Case 1: the downstream operator hasn't reserved the minimum resources + # to run at least one task. + return True + # Case 2: the downstream operator has reserved the minimum resources, but + # the resources are preempted by non-Data tasks or actors. + # We don't have a good way to detect this case, so we'll unblock + # backpressure when the downstream operator has been idle for a while. + if self._idle_detector.detect_idle(next_op): + return True + return False + + def _op_outputs_reserved_remaining(self, op: PhysicalOperator) -> int: + outputs_usage = self._resource_manager._mem_op_outputs[op] + return max(self._reserved_for_op_outputs[op] - outputs_usage, 0) + + def max_task_output_bytes_to_read(self, op: PhysicalOperator) -> Optional[int]: + if op not in self._op_budgets: + return None + res = self._op_budgets[ + op + ].object_store_memory + self._op_outputs_reserved_remaining(op) + assert res >= 0 + if res == 0 and self._should_unblock_streaming_output_backpressure(op): + res = 1 + return res + + def _get_downstream_non_map_op_memory_usage(self, op: PhysicalOperator) -> int: + """Get the total memory usage of the downstream non-Map operators.""" + usage = 0 + for next_op in op.output_dependencies: + if not isinstance(next_op, MapOperator): + usage += self._resource_manager.get_op_usage( + next_op + ).object_store_memory + usage += self._get_downstream_non_map_op_memory_usage(next_op) + return usage def update_usages(self): - if len(self._eligible_ops) == 0: - return + self._update_reservation() - global_limits = self._resource_manager.get_global_limits() - if global_limits != self._cached_global_limits: - self._on_global_limits_updated(global_limits) - self._cached_global_limits = global_limits + self._op_budgets.clear() + eligible_ops = self._get_eligible_ops() + if len(eligible_ops) == 0: + return - self._op_limits.clear() # Remaining of shared resources. remaining_shared = self._total_shared - for op in self._resource_manager._topology: - op_usage = self._resource_manager.get_op_usage(op) - if op in self._eligible_ops: - op_reserved = self._op_reserved[op] - # How much of the reserved resources are remaining. - op_reserved_remaining = op_reserved.subtract(op_usage).max( - ExecutionResources.zero() - ) - self._op_limits[op] = op_reserved_remaining - # How much of the reserved resources are exceeded. - # If exceeded, we need to subtract from the remaining shared resources. - op_reserved_exceeded = op_usage.subtract(op_reserved).max( - ExecutionResources.zero() - ) - remaining_shared = remaining_shared.subtract(op_reserved_exceeded) - else: - # For non-eligible ops, we still need to subtract - # their usage from the remaining shared resources. - remaining_shared = remaining_shared.subtract(op_usage) - - shared_divided = remaining_shared.max(ExecutionResources.zero()).scale( - 1.0 / len(self._eligible_ops) - ) - for op in self._eligible_ops: - self._op_limits[op] = self._op_limits[op].add(shared_divided) + for op in eligible_ops: + # Calculate the memory usage of the operator. + op_mem_usage = 0 + # Add the memory usage of the operator itself, + # excluding `_reserved_for_op_outputs`. + op_mem_usage += self._resource_manager._mem_pending_task_outputs[op] + op_mem_usage += max( + self._resource_manager._mem_op_outputs[op] + - self._reserved_for_op_outputs[op], + 0, + ) + # Also account the downstream non-Map operators' memory usage + # to the current Map operator. + # This is because we don't directly throttle non-Map operators. + # So if they are using too much memory, we should throttle their + # upstream Map operator. + op_mem_usage += self._get_downstream_non_map_op_memory_usage(op) + op_usage = copy.deepcopy(self._resource_manager.get_op_usage(op)) + op_usage.object_store_memory = op_mem_usage + op_reserved = self._op_reserved[op] + # How much of the reserved resources are remaining. + op_reserved_remaining = op_reserved.subtract(op_usage).max( + ExecutionResources.zero() + ) + self._op_budgets[op] = op_reserved_remaining + # How much of the reserved resources are exceeded. + # If exceeded, we need to subtract from the remaining shared resources. + op_reserved_exceeded = op_usage.subtract(op_reserved).max( + ExecutionResources.zero() + ) + remaining_shared = remaining_shared.subtract(op_reserved_exceeded) + + remaining_shared = remaining_shared.max(ExecutionResources.zero()) + + # Allocate the remaining shared resources to each operator. + for i, op in enumerate(reversed(eligible_ops)): + # By default, divide the remaining shared resources equally. + op_shared = remaining_shared.scale(1.0 / (len(eligible_ops) - i)) + # But if the op's budget is less than `incremental_resource_usage`, + # it will be useless. So we'll let the downstream operator + # borrow some resources from the upstream operator, if remaining_shared + # is still enough. + to_borrow = ( + op.incremental_resource_usage() + .subtract(self._op_budgets[op].add(op_shared)) + .max(ExecutionResources.zero()) + ) + if not to_borrow.is_zero() and op_shared.add(to_borrow).satisfies_limit( + remaining_shared + ): + op_shared = op_shared.add(to_borrow) + remaining_shared = remaining_shared.subtract(op_shared) + assert remaining_shared.is_non_negative(), ( + remaining_shared, + op, + op_shared, + to_borrow, + ) + self._op_budgets[op] = self._op_budgets[op].add(op_shared) # We don't limit GPU resources, as not all operators # use GPU resources. - self._op_limits[op].gpu = float("inf") - - def get_op_limits(self, op: PhysicalOperator) -> ExecutionResources: - if op in self._op_limits: - return self._op_limits[op] - else: - return ExecutionResources.inf() + self._op_budgets[op].gpu = float("inf") diff --git a/python/ray/data/_internal/execution/streaming_executor.py b/python/ray/data/_internal/execution/streaming_executor.py index 3a4949f353df..e78cfc9e07e8 100644 --- a/python/ray/data/_internal/execution/streaming_executor.py +++ b/python/ray/data/_internal/execution/streaming_executor.py @@ -253,11 +253,14 @@ def _scheduling_loop_step(self, topology: Topology) -> bool: if DEBUG_TRACE_SCHEDULING: logger.get_logger().info("Scheduling loop step...") + self._resource_manager.update_usages() # Note: calling process_completed_tasks() is expensive since it incurs # ray.wait() overhead, so make sure to allow multiple dispatch per call for # greater parallelism. num_errored_blocks = process_completed_tasks( - topology, self._backpressure_policies, self._max_errored_blocks + topology, + self._resource_manager, + self._max_errored_blocks, ) if self._max_errored_blocks > 0: self._max_errored_blocks -= num_errored_blocks diff --git a/python/ray/data/_internal/execution/streaming_executor_state.py b/python/ray/data/_internal/execution/streaming_executor_state.py index 65c83f3be568..d380c9f2967b 100644 --- a/python/ray/data/_internal/execution/streaming_executor_state.py +++ b/python/ray/data/_internal/execution/streaming_executor_state.py @@ -33,9 +33,7 @@ ) from ray.data._internal.execution.operators.input_data_buffer import InputDataBuffer from ray.data._internal.execution.resource_manager import ResourceManager -from ray.data._internal.execution.util import memory_string from ray.data._internal.progress_bar import ProgressBar -from ray.data.context import DataContext logger = DatasetLogger(__name__) @@ -245,8 +243,7 @@ def summary_str(self, resource_manager: ResourceManager) -> str: queued = self.num_queued() + self.op.internal_queue_size() active = self.op.num_active_tasks() desc = f"- {self.op.name}: {active} active, {queued} queued" - mem = memory_string(resource_manager.get_op_usage(self.op).object_store_memory) - desc += f", {mem} objects" + desc += f", [{resource_manager.get_op_usage_str(self.op)}]" suffix = self.op.progress_str() if suffix: desc += f", {suffix}" @@ -359,7 +356,7 @@ def setup_state(op: PhysicalOperator) -> OpState: def process_completed_tasks( topology: Topology, - backpressure_policies: List[BackpressurePolicy], + resource_manager: ResourceManager, max_errored_blocks: int, ) -> int: """Process any newly completed tasks. To update operator @@ -380,17 +377,14 @@ def process_completed_tasks( for task in op.get_active_tasks(): active_tasks[task.get_waitable()] = (state, task) - max_blocks_to_read_per_op: Dict[OpState, int] = {} - for policy in backpressure_policies: - res = policy.calculate_max_blocks_to_read_per_op(topology) - if len(res) > 0: - if len(max_blocks_to_read_per_op) > 0: - raise ValueError( - "At most one backpressure policy that implements " - "calculate_max_blocks_to_read_per_op() can be used at a time." - ) - else: - max_blocks_to_read_per_op = res + max_bytes_to_read_per_op: Dict[OpState, int] = {} + if resource_manager.op_resource_allocator_enabled(): + for op, state in topology.items(): + max_bytes_to_read = ( + resource_manager.op_resource_allocator.max_task_output_bytes_to_read(op) + ) + if max_bytes_to_read is not None: + max_bytes_to_read_per_op[state] = max_bytes_to_read # Process completed Ray tasks and notify operators. num_errored_blocks = 0 @@ -404,8 +398,7 @@ def process_completed_tasks( # Organize tasks by the operator they belong to, and sort them by task index. # So that we'll process them in a deterministic order. - # This is because some backpressure policies (e.g., - # StreamingOutputBackpressurePolicy) may limit the number of blocks to read + # This is because OpResourceAllocator may limit the number of blocks to read # per operator. In this case, we want to have fewer tasks finish quickly and # yield resources, instead of having all tasks output blocks together. ready_tasks_by_op = defaultdict(list) @@ -418,11 +411,11 @@ def process_completed_tasks( for task in ready_tasks: if isinstance(task, DataOpTask): try: - num_blocks_read = task.on_data_ready( - max_blocks_to_read_per_op.get(state, None) + bytes_read = task.on_data_ready( + max_bytes_to_read_per_op.get(state, None) ) - if state in max_blocks_to_read_per_op: - max_blocks_to_read_per_op[state] -= num_blocks_read + if state in max_bytes_to_read_per_op: + max_bytes_to_read_per_op[state] -= bytes_read except Exception as e: num_errored_blocks += 1 should_ignore = ( @@ -522,7 +515,12 @@ def select_operator_to_run( # Filter to ops that are eligible for execution. ops = [] for op, state in topology.items(): - under_resource_limits = _execution_allowed(op, resource_manager) + if resource_manager.op_resource_allocator_enabled(): + under_resource_limits = ( + resource_manager.op_resource_allocator.can_submit_new_task(op) + ) + else: + under_resource_limits = _execution_allowed(op, resource_manager) if ( under_resource_limits and not op.completed() @@ -563,13 +561,13 @@ def select_operator_to_run( if not ops: return None - # Run metadata-only operators first. After that, equally penalize outqueue length - # and num bundles processing for backpressure. + # Run metadata-only operators first. After that, choose the operator with the least + # memory usage. return min( ops, key=lambda op: ( not op.throttling_disabled(), - len(topology[op].outqueue) + topology[op].num_processing(), + resource_manager.get_op_usage(op).object_store_memory, ), ) @@ -637,9 +635,9 @@ def _execution_allowed(op: PhysicalOperator, resource_manager: ResourceManager) Returns: Whether the op is allowed to run. """ - if op.throttling_disabled(): return True + global_usage = resource_manager.get_global_usage() global_limits = resource_manager.get_global_limits() @@ -664,9 +662,7 @@ def _execution_allowed(op: PhysicalOperator, resource_manager: ResourceManager) inc_indicator = ExecutionResources( cpu=1 if inc.cpu else 0, gpu=1 if inc.gpu else 0, - object_store_memory=inc.object_store_memory - if DataContext.get_current().use_runtime_metrics_scheduling - else None, + object_store_memory=0, ) # Under global limits; always allow. @@ -683,24 +679,9 @@ def _execution_allowed(op: PhysicalOperator, resource_manager: ResourceManager) ) global_ok_sans_memory = new_usage.satisfies_limit(global_limits_sans_memory) downstream_memory = resource_manager.get_downstream_object_store_memory(op) - if ( - DataContext.get_current().use_runtime_metrics_scheduling - and inc.object_store_memory - ): - downstream_memory += inc.object_store_memory downstream_limit = global_limits.scale(resource_manager.get_downstream_fraction(op)) downstream_memory_ok = ExecutionResources( object_store_memory=downstream_memory ).satisfies_limit(downstream_limit) - # If completing a task decreases the overall object store memory usage, allow it - # even if we're over the global limit. - if ( - DataContext.get_current().use_runtime_metrics_scheduling - and global_ok_sans_memory - and op.metrics.average_bytes_change_per_task is not None - and op.metrics.average_bytes_change_per_task <= 0 - ): - return True - return global_ok_sans_memory and downstream_memory_ok diff --git a/python/ray/data/context.py b/python/ray/data/context.py index 3734452064f7..ed82394609ee 100644 --- a/python/ray/data/context.py +++ b/python/ray/data/context.py @@ -79,11 +79,6 @@ # Whether to use Polars for tabular dataset sorts, groupbys, and aggregations. DEFAULT_USE_POLARS = False -# Whether to use the runtime object store memory metrics for scheduling. -DEFAULT_USE_RUNTIME_METRICS_SCHEDULING = bool( - int(os.environ.get("DEFAULT_USE_RUNTIME_METRICS_SCHEDULING", "0")) -) - # Whether to eagerly free memory (new backend only). DEFAULT_EAGER_FREE = bool(int(os.environ.get("RAY_DATA_EAGER_FREE", "1"))) @@ -143,16 +138,25 @@ # This follows same format as `retry_exceptions` in Ray Core. DEFAULT_ACTOR_TASK_RETRY_ON_ERRORS = False -# Whether to enable ReservationOpResourceLimiter by default. +# Whether to enable ReservationOpResourceAllocator by default. DEFAULT_ENABLE_OP_RESOURCE_RESERVATION = bool( - os.environ.get("RAY_DATA_ENABLE_OP_RESOURCE_RESERVATION", "0") + os.environ.get("RAY_DATA_ENABLE_OP_RESOURCE_RESERVATION", "1") ) -# The default reservation ratio for ReservationOpResourceLimiter. +# The default reservation ratio for ReservationOpResourceAllocator. DEFAULT_OP_RESOURCE_RESERVATION_RATIO = float( os.environ.get("RAY_DATA_OP_RESERVATION_RATIO", "0.5") ) +# Default value of the max number of blocks that can be buffered at the +# streaming generator of each `DataOpTask`. +# Note, if this value is too large, we'll need to allocate more memory +# buffer for the pending task outputs, which may lead to bad performance +# as we may not have enough memory buffer for the operator outputs. +# If the value is too small, the task may be frequently blocked due to +# streaming generator backpressure. +DEFAULT_MAX_NUM_BLOCKS_IN_STREAMING_GEN_BUFFER = 2 + @DeveloperAPI class DataContext: @@ -187,7 +191,6 @@ def __init__( use_ray_tqdm: bool, enable_progress_bars: bool, enable_get_object_locations_for_metrics: bool, - use_runtime_metrics_scheduling: bool, write_file_retry_on_errors: List[str], warn_on_driver_memory_usage_bytes: int, actor_task_retry_on_errors: Union[bool, List[BaseException]], @@ -221,7 +224,6 @@ def __init__( self.enable_get_object_locations_for_metrics = ( enable_get_object_locations_for_metrics ) - self.use_runtime_metrics_scheduling = use_runtime_metrics_scheduling self.write_file_retry_on_errors = write_file_retry_on_errors self.warn_on_driver_memory_usage_bytes = warn_on_driver_memory_usage_bytes self.actor_task_retry_on_errors = actor_task_retry_on_errors @@ -243,12 +245,12 @@ def __init__( # the DataContext from the plugin implementations, as well as to avoid # circular dependencies. self._kv_configs: Dict[str, Any] = {} - # The max number of blocks that can be buffered at the streaming generator of - # each `DataOpTask`. - self._max_num_blocks_in_streaming_gen_buffer = None - # Whether to enable ReservationOpResourceLimiter. + self._max_num_blocks_in_streaming_gen_buffer = ( + DEFAULT_MAX_NUM_BLOCKS_IN_STREAMING_GEN_BUFFER + ) + # Whether to enable ReservationOpResourceAllocator. self.op_resource_reservation_enabled = DEFAULT_ENABLE_OP_RESOURCE_RESERVATION - # The reservation ratio for ReservationOpResourceLimiter. + # The reservation ratio for ReservationOpResourceAllocator. self.op_resource_reservation_ratio = DEFAULT_OP_RESOURCE_RESERVATION_RATIO @staticmethod @@ -294,7 +296,6 @@ def get_current() -> "DataContext": use_ray_tqdm=DEFAULT_USE_RAY_TQDM, enable_progress_bars=DEFAULT_ENABLE_PROGRESS_BARS, enable_get_object_locations_for_metrics=DEFAULT_ENABLE_GET_OBJECT_LOCATIONS_FOR_METRICS, # noqa E501 - use_runtime_metrics_scheduling=DEFAULT_USE_RUNTIME_METRICS_SCHEDULING, # noqa: E501 write_file_retry_on_errors=DEFAULT_WRITE_FILE_RETRY_ON_ERRORS, warn_on_driver_memory_usage_bytes=( DEFAULT_WARN_ON_DRIVER_MEMORY_USAGE_BYTES diff --git a/python/ray/data/tests/test_backpressure_e2e.py b/python/ray/data/tests/test_backpressure_e2e.py new file mode 100644 index 000000000000..f2c030d67b8a --- /dev/null +++ b/python/ray/data/tests/test_backpressure_e2e.py @@ -0,0 +1,276 @@ +import time + +import numpy as np +import pandas as pd +import pytest + +import ray +from ray._private.internal_api import memory_summary +from ray.data.block import BlockMetadata +from ray.data.datasource import Datasource, ReadTask +from ray.data.tests.conftest import restore_data_context # noqa: F401 +from ray.data.tests.conftest import ( + CoreExecutionMetrics, + assert_core_execution_metrics_equals, + get_initial_core_execution_metrics_snapshot, +) +from ray.tests.conftest import shutdown_only # noqa: F401 + + +def test_large_e2e_backpressure_no_spilling( + shutdown_only, restore_data_context # noqa: F811 +): + """Test backpressure can prevent object spilling on a synthetic large-scale + workload.""" + # The cluster has 10 CPUs and 200MB object store memory. + # + # Each produce task generates 10 blocks, each of which has 10MB data. + # In total, there will be 10 * 10 * 10MB = 1000MB intermediate data. + # + # `ReservationOpResourceAllocator` should dynamically allocate resources to each + # operator and prevent object spilling. + NUM_CPUS = 10 + NUM_ROWS_PER_TASK = 10 + NUM_TASKS = 20 + NUM_ROWS_TOTAL = NUM_ROWS_PER_TASK * NUM_TASKS + BLOCK_SIZE = 10 * 1024 * 1024 + object_store_memory = 200 * 1024**2 + print(f"object_store_memory: {object_store_memory/1024/1024}MB") + ray.init(num_cpus=NUM_CPUS, object_store_memory=object_store_memory) + + def produce(batch): + print("Produce task started", batch["id"]) + time.sleep(0.1) + for id in batch["id"]: + print("Producing", id) + yield { + "id": [id], + "image": [np.zeros(BLOCK_SIZE, dtype=np.uint8)], + } + + def consume(batch): + print("Consume task started", batch["id"]) + time.sleep(0.01) + return {"id": batch["id"], "result": [0 for _ in batch["id"]]} + + data_context = ray.data.DataContext.get_current() + data_context.execution_options.verbose_progress = True + data_context.target_max_block_size = BLOCK_SIZE + + last_snapshot = get_initial_core_execution_metrics_snapshot() + + ds = ray.data.range(NUM_ROWS_TOTAL, parallelism=NUM_TASKS) + ds = ds.map_batches(produce, batch_size=NUM_ROWS_PER_TASK) + ds = ds.map_batches(consume, batch_size=None, num_cpus=0.9) + # Check core execution metrics every 10 rows, because it's expensive. + for _ in ds.iter_batches(batch_size=NUM_ROWS_PER_TASK): + last_snapshot = assert_core_execution_metrics_equals( + CoreExecutionMetrics( + object_store_stats={ + "spilled_bytes_total": 0, + "restored_bytes_total": 0, + }, + ), + last_snapshot, + ) + + +def _build_dataset( + obj_store_limit, + producer_num_cpus, + consumer_num_cpus, + num_blocks, + block_size, +): + # Create a dataset with 2 operators: + # - The producer op has only 1 task, which produces `num_blocks` blocks, each + # of which has `block_size` data. + # - The consumer op has `num_blocks` tasks, each of which consumes 1 block. + ctx = ray.data.DataContext.get_current() + ctx.target_max_block_size = block_size + ctx.execution_options.resource_limits.object_store_memory = obj_store_limit + + def producer(batch): + for i in range(num_blocks): + print("Producing block", i, time.time()) + yield { + "id": [i], + "data": [np.zeros(block_size, dtype=np.uint8)], + } + + def consumer(batch): + assert len(batch["id"]) == 1 + print("Consuming block", batch["id"][0], time.time()) + time.sleep(0.01) + del batch["data"] + return batch + + ds = ray.data.range(1, parallelism=1).materialize() + ds = ds.map_batches(producer, batch_size=None, num_cpus=producer_num_cpus) + ds = ds.map_batches(consumer, batch_size=None, num_cpus=consumer_num_cpus) + return ds + + +@pytest.mark.parametrize( + "cluster_cpus, cluster_obj_store_mem_mb", + [ + (3, 500), # CPU not enough + (4, 100), # Object store memory not enough + (3, 100), # Both not enough + ], +) +def test_no_deadlock_on_small_cluster_resources( + cluster_cpus, + cluster_obj_store_mem_mb, + shutdown_only, # noqa: F811 + restore_data_context, # noqa: F811 +): + """Test when cluster resources are not enough for launching one task per op, + the execution can still proceed without deadlock. + """ + cluster_obj_store_mem_mb *= 1024**2 + ray.init(num_cpus=cluster_cpus, object_store_memory=cluster_obj_store_mem_mb) + num_blocks = 10 + block_size = 100 * 1024 * 1024 + ds = _build_dataset( + obj_store_limit=cluster_obj_store_mem_mb // 2, + producer_num_cpus=3, + consumer_num_cpus=1, + num_blocks=num_blocks, + block_size=block_size, + ) + assert len(ds.take_all()) == num_blocks + + +def test_no_deadlock_on_resource_contention( + shutdown_only, restore_data_context # noqa: F811 +): + """Test when resources are preempted by non-Data code, the execution can + still proceed without deadlock.""" + cluster_obj_store_mem = 1000 * 1024 * 1024 + ray.init(num_cpus=5, object_store_memory=cluster_obj_store_mem) + # Create a non-Data actor that uses 4 CPUs, only 1 CPU + # is left for Data. Currently Data StreamExecutor still + # incorrectly assumes it has all the 5 CPUs. + # Check that we don't deadlock in this case. + + @ray.remote(num_cpus=4) + class DummyActor: + def foo(self): + return None + + dummy_actor = DummyActor.remote() + ray.get(dummy_actor.foo.remote()) + + num_blocks = 10 + block_size = 50 * 1024 * 1024 + ds = _build_dataset( + obj_store_limit=cluster_obj_store_mem // 2, + producer_num_cpus=1, + consumer_num_cpus=0.9, + num_blocks=num_blocks, + block_size=block_size, + ) + assert len(ds.take_all()) == num_blocks + + +def test_input_backpressure_e2e(restore_data_context, shutdown_only): # noqa: F811 + # Tests that backpressure applies even when reading directly from the input + # datasource. This relies on datasource metadata size estimation. + @ray.remote + class Counter: + def __init__(self): + self.count = 0 + + def increment(self): + self.count += 1 + + def get(self): + return self.count + + def reset(self): + self.count = 0 + + class CountingRangeDatasource(Datasource): + def __init__(self): + self.counter = Counter.remote() + + def prepare_read(self, parallelism, n): + def range_(i): + ray.get(self.counter.increment.remote()) + return [ + pd.DataFrame({"data": np.ones((n // parallelism * 1024 * 1024,))}) + ] + + sz = (n // parallelism) * 1024 * 1024 * 8 + print("Block size", sz) + + return [ + ReadTask( + lambda i=i: range_(i), + BlockMetadata( + num_rows=n // parallelism, + size_bytes=sz, + schema=None, + input_files=None, + exec_stats=None, + ), + ) + for i in range(parallelism) + ] + + source = CountingRangeDatasource() + ctx = ray.data.DataContext.get_current() + ctx.execution_options.resource_limits.object_store_memory = 10e6 + + # 10GiB dataset. + ds = ray.data.read_datasource(source, n=10000, parallelism=1000) + it = iter(ds.iter_batches(batch_size=None, prefetch_batches=0)) + next(it) + time.sleep(3) + del it, ds + launched = ray.get(source.counter.get.remote()) + + # If backpressure is broken we'll launch 15+. + assert launched <= 10, launched + + +def test_streaming_backpressure_e2e(restore_data_context): # noqa: F811 + # This test case is particularly challenging since there is a large input->output + # increase in data size: https://github.com/ray-project/ray/issues/34041 + class TestSlow: + def __call__(self, df: np.ndarray): + time.sleep(2) + return {"id": np.random.randn(1, 20, 1024, 1024)} + + class TestFast: + def __call__(self, df: np.ndarray): + time.sleep(0.5) + return {"id": np.random.randn(1, 20, 1024, 1024)} + + ctx = ray.init(object_store_memory=4e9) + ds = ray.data.range_tensor(20, shape=(3, 1024, 1024), parallelism=20) + + pipe = ds.map_batches( + TestFast, + batch_size=1, + num_cpus=0.5, + compute=ray.data.ActorPoolStrategy(size=2), + ).map_batches( + TestSlow, + batch_size=1, + compute=ray.data.ActorPoolStrategy(size=1), + ) + + for batch in pipe.iter_batches(batch_size=1, prefetch_batches=2): + ... + + # If backpressure is not working right, we will spill. + meminfo = memory_summary(ctx.address_info["address"], stats_only=True) + assert "Spilled" not in meminfo, meminfo + + +if __name__ == "__main__": + import sys + + sys.exit(pytest.main(["-v", __file__])) diff --git a/python/ray/data/tests/test_backpressure_policies.py b/python/ray/data/tests/test_backpressure_policies.py index 376aa6e59a0b..c6a959d4e552 100644 --- a/python/ray/data/tests/test_backpressure_policies.py +++ b/python/ray/data/tests/test_backpressure_policies.py @@ -3,28 +3,19 @@ import time import unittest from collections import defaultdict -from unittest.mock import MagicMock, patch +from unittest.mock import MagicMock -import numpy as np import pytest import ray from ray.data._internal.execution.backpressure_policy import ( ENABLED_BACKPRESSURE_POLICIES_CONFIG_KEY, ConcurrencyCapBackpressurePolicy, - StreamingOutputBackpressurePolicy, ) from ray.data._internal.execution.operators.input_data_buffer import InputDataBuffer from ray.data._internal.execution.operators.task_pool_map_operator import ( TaskPoolMapOperator, ) -from ray.data.tests.conftest import restore_data_context # noqa: F401 -from ray.data.tests.conftest import ( - CoreExecutionMetrics, - assert_core_execution_metrics_equals, - get_initial_core_execution_metrics_snapshot, -) -from ray.tests.conftest import shutdown_only # noqa: F401 class TestConcurrencyCapBackpressurePolicy(unittest.TestCase): @@ -139,297 +130,6 @@ def test_e2e_normal(self): assert start1 < start2 < end1 < end2, (start1, start2, end1, end2) -class TestStreamOutputBackpressurePolicy(unittest.TestCase): - """Tests for StreamOutputBackpressurePolicy.""" - - @classmethod - def setUpClass(cls): - cls._cluster_cpus = 5 - cls._cluster_object_memory = 500 * 1024 * 1024 - ray.init( - num_cpus=cls._cluster_cpus, object_store_memory=cls._cluster_object_memory - ) - data_context = ray.data.DataContext.get_current() - cls._num_blocks = 5 - cls._block_size = 100 * 1024 * 1024 - policy_cls = StreamingOutputBackpressurePolicy - cls._max_blocks_in_op_output_queue = 1 - cls._max_blocks_in_generator_buffer = 1 - cls._configs = { - ENABLED_BACKPRESSURE_POLICIES_CONFIG_KEY: [policy_cls], - policy_cls.MAX_BLOCKS_IN_OP_OUTPUT_QUEUE_CONFIG_KEY: ( - cls._max_blocks_in_op_output_queue - ), - policy_cls.MAX_BLOCKS_IN_GENERATOR_BUFFER_CONFIG_KEY: ( - cls._max_blocks_in_generator_buffer - ), - } - for k, v in cls._configs.items(): - data_context.set_config(k, v) - data_context.execution_options.preserve_order = True - - @classmethod - def tearDownClass(cls): - data_context = ray.data.DataContext.get_current() - for k in cls._configs.keys(): - data_context.remove_config(k) - data_context.execution_options.preserve_order = False - ray.shutdown() - - def _create_mock_op_and_op_state( - self, - name, - outqueue_num_blocks=0, - num_active_tasks=0, - num_task_outputs_generated=0, - ): - op = MagicMock() - op.__str__.return_value = f"Op({name})" - op.num_active_tasks.return_value = num_active_tasks - op.metrics.num_task_outputs_generated = num_task_outputs_generated - - state = MagicMock() - state.__str__.return_value = f"OpState({name})" - state.outqueue_num_blocks.return_value = outqueue_num_blocks - - state.op = op - return op, state - - def test_policy_basic(self): - """Basic unit test for the policy without real execution.""" - up_op, up_state = self._create_mock_op_and_op_state("up") - down_op, down_state = self._create_mock_op_and_op_state("down") - topology = {} - topology[up_op] = up_state - topology[down_op] = down_state - - policy = StreamingOutputBackpressurePolicy(topology) - assert ( - policy._max_num_blocks_in_op_output_queue - == self._max_blocks_in_op_output_queue - ) - data_context = ray.data.DataContext.get_current() - assert ( - data_context._max_num_blocks_in_streaming_gen_buffer - == self._max_blocks_in_generator_buffer - ) - - # Buffers are empty, both ops can read up to the max. - res = policy.calculate_max_blocks_to_read_per_op(topology) - assert res == { - up_state: self._max_blocks_in_op_output_queue, - down_state: self._max_blocks_in_op_output_queue, - } - - # up_op's buffer is full, but down_up has no active tasks. - # We'll still allow up_op to read 1 block. - up_state.outqueue_num_blocks.return_value = self._max_blocks_in_op_output_queue - res = policy.calculate_max_blocks_to_read_per_op(topology) - assert res == { - up_state: 1, - down_state: self._max_blocks_in_op_output_queue, - } - - # down_op now has 1 active task. So we won't allow up_op to read any more. - down_op.num_active_tasks.return_value = 1 - res = policy.calculate_max_blocks_to_read_per_op(topology) - assert res == { - up_state: 0, - down_state: self._max_blocks_in_op_output_queue, - } - - # After `MAX_OUTPUT_IDLE_SECONDS` of no outputs from down_up, - # we'll allow up_op to read 1 block again. - with patch.object( - StreamingOutputBackpressurePolicy, "MAX_OUTPUT_IDLE_SECONDS", 0.1 - ): - time.sleep(0.11) - res = policy.calculate_max_blocks_to_read_per_op(topology) - assert res == { - up_state: 1, - down_state: self._max_blocks_in_op_output_queue, - } - - # down_up now has outputs, so we won't allow up_op to read any more. - down_op.metrics.num_task_outputs_generated = 1 - res = policy.calculate_max_blocks_to_read_per_op(topology) - assert res == { - up_state: 0, - down_state: self._max_blocks_in_op_output_queue, - } - - def _run_dataset(self, producer_num_cpus, consumer_num_cpus): - # Create a dataset with 2 operators: - # - The producer op has only 1 task, which produces 5 blocks, each of which - # has 100MB data. - # - The consumer op has 5 slow tasks, each of which consumes 1 block. - # Return the timestamps at the producer and consumer tasks for each block. - num_blocks = self._num_blocks - block_size = self._block_size - ray.data.DataContext.get_current().target_max_block_size = block_size - - def producer(batch): - for i in range(num_blocks): - print("Producing block", i) - yield { - "id": [i], - "data": [np.zeros(block_size, dtype=np.uint8)], - "producer_timestamp": [time.time()], - } - - def consumer(batch): - assert len(batch["id"]) == 1 - print("Consuming block", batch["id"][0]) - batch["consumer_timestamp"] = [time.time()] - time.sleep(0.1) - del batch["data"] - return batch - - ds = ray.data.range(1, parallelism=1).materialize() - ds = ds.map_batches(producer, batch_size=None, num_cpus=producer_num_cpus) - ds = ds.map_batches(consumer, batch_size=None, num_cpus=consumer_num_cpus) - - res = ds.take_all() - assert [row["id"] for row in res] == list(range(self._num_blocks)) - return ( - [row["producer_timestamp"] for row in res], - [row["consumer_timestamp"] for row in res], - ) - - def test_no_deadlock(self): - # The producer needs all 5 CPUs, and the consumer has no CPU to run. - # In this case, we shouldn't backpressure the producer and let it run - # until it finishes. - producer_timestamps, consumer_timestamps = self._run_dataset( - producer_num_cpus=5, consumer_num_cpus=1 - ) - assert producer_timestamps[-1] < consumer_timestamps[0], ( - producer_timestamps, - consumer_timestamps, - ) - - def test_no_deadlock_for_resource_contention(self): - """Test no deadlock in case of resource contention from - non-Data code.""" - # Create a non-Data actor that uses 4 CPUs, only 1 CPU - # is left for Data. Currently Data StreamExecutor still - # incorrectly assumes it has all the 5 CPUs. - # Check that we don't deadlock in this case. - - @ray.remote(num_cpus=4) - class DummyActor: - def foo(self): - return None - - dummy_actor = DummyActor.remote() - ray.get(dummy_actor.foo.remote()) - - producer_timestamps, consumer_timestamps = self._run_dataset( - producer_num_cpus=1, consumer_num_cpus=0.9 - ) - assert producer_timestamps[-1] < consumer_timestamps[0], ( - producer_timestamps, - consumer_timestamps, - ) - - -def test_large_e2e_backpressure(shutdown_only, restore_data_context): # noqa: F811 - """Test backpressure on a synthetic large-scale workload.""" - # The cluster has 10 CPUs and 200MB object store memory. - # The dataset will have 200MB * 25% = 50MB memory budget. - # - # Each produce task generates 10 blocks, each of which has 10MB data. - # In total, there will be 10 * 10 * 10MB = 1000MB intermediate data. - # - # With StreamingOutputBackpressurePolicy and the following configuration, - # the executor will still schedule 10 produce tasks, but only the first task is - # allowed to output all blocks. The total size of pending blocks will be - # (10 + 9 * 1 + 1) * 10MB = 200MB, where - # - 10 is the number of blocks in the first task. - # - 9 * 1 is the number of blocks pending at the streaming generator level of - # the other 9 tasks. - # - 1 is the number of blocks pending at the output queue. - - NUM_CPUS = 10 - NUM_ROWS_PER_TASK = 10 - NUM_TASKS = 20 - NUM_ROWS_TOTAL = NUM_ROWS_PER_TASK * NUM_TASKS - BLOCK_SIZE = 10 * 1024 * 1024 - STREMING_GEN_BUFFER_SIZE = 1 - OP_OUTPUT_QUEUE_SIZE = 1 - max_pending_block_bytes = ( - NUM_ROWS_PER_TASK - + (NUM_CPUS - 1) * STREMING_GEN_BUFFER_SIZE - + OP_OUTPUT_QUEUE_SIZE - ) * BLOCK_SIZE - # Set the object store memory to be slightly larger than the pending data - # size because object store spilling is triggered at 80% capacity. - object_store_memory = max_pending_block_bytes / 0.8 + BLOCK_SIZE - print(f"max_pending_block_bytes: {max_pending_block_bytes/1024/1024}MB") - print(f"object_store_memory: {object_store_memory/1024/1024}MB") - - ray.init(num_cpus=NUM_CPUS, object_store_memory=object_store_memory) - - def produce(batch): - print("Produce task started", batch["id"]) - time.sleep(0.1) - for id in batch["id"]: - print("Producing", id) - yield { - "id": [id], - "image": [np.zeros(BLOCK_SIZE, dtype=np.uint8)], - } - - def consume(batch): - print("Consume task started", batch["id"]) - time.sleep(0.01) - return {"id": batch["id"], "result": [0 for _ in batch["id"]]} - - data_context = ray.data.DataContext.get_current() - data_context.set_config( - ENABLED_BACKPRESSURE_POLICIES_CONFIG_KEY, - [ - StreamingOutputBackpressurePolicy, - ], - ) - data_context.set_config( - StreamingOutputBackpressurePolicy.MAX_BLOCKS_IN_OP_OUTPUT_QUEUE_CONFIG_KEY, - OP_OUTPUT_QUEUE_SIZE, - ) - data_context.set_config( - StreamingOutputBackpressurePolicy.MAX_BLOCKS_IN_GENERATOR_BUFFER_CONFIG_KEY, - STREMING_GEN_BUFFER_SIZE, - ) - data_context.execution_options.verbose_progress = True - data_context.target_max_block_size = BLOCK_SIZE - - last_snapshot = get_initial_core_execution_metrics_snapshot() - - ds = ray.data.range(NUM_ROWS_TOTAL, parallelism=NUM_TASKS) - ds = ds.map_batches(produce, batch_size=NUM_ROWS_PER_TASK) - ds = ds.map_batches(consume, batch_size=None, num_cpus=0.9) - # Check core execution metrics every 10 rows, because it's expensive. - for _ in ds.iter_batches(batch_size=NUM_ROWS_PER_TASK): - # The amount of generated data should be less than - # max_pending_block_bytes (pending data) + - # NUM_ROWS_PER_TASK * BLOCK_SIZE (consumed data) - max_created_bytes_per_consumption = ( - max_pending_block_bytes + NUM_ROWS_PER_TASK * BLOCK_SIZE - ) - - last_snapshot = assert_core_execution_metrics_equals( - CoreExecutionMetrics( - object_store_stats={ - "spilled_bytes_total": 0, - "restored_bytes_total": 0, - "cumulative_created_plasma_bytes": lambda x: x - <= 1.5 * max_created_bytes_per_consumption, - }, - ), - last_snapshot, - ) - - if __name__ == "__main__": import sys diff --git a/python/ray/data/tests/test_dynamic_block_split.py b/python/ray/data/tests/test_dynamic_block_split.py index e24782d49142..fd91863804a0 100644 --- a/python/ray/data/tests/test_dynamic_block_split.py +++ b/python/ray/data/tests/test_dynamic_block_split.py @@ -125,8 +125,8 @@ def test_bulk_lazy_eval_split_mode(shutdown_only, block_split, tmp_path): ray.data.range(8, parallelism=8).write_csv(str(tmp_path)) if not block_split: - # Setting infinite block size effectively disables block splitting. - ctx.target_max_block_size = float("inf") + # Setting a huge block size effectively disables block splitting. + ctx.target_max_block_size = 2**64 ds = ray.data.read_datasource(SlowCSVDatasource(str(tmp_path)), parallelism=8) start = time.time() diff --git a/python/ray/data/tests/test_executor_resource_management.py b/python/ray/data/tests/test_executor_resource_management.py index 9fbc35c7fc29..1b584f63af0b 100644 --- a/python/ray/data/tests/test_executor_resource_management.py +++ b/python/ray/data/tests/test_executor_resource_management.py @@ -55,7 +55,16 @@ def test_resource_canonicalization(ray_start_10_cpus_shared): compute_strategy=TaskPoolStrategy(), ) assert op.base_resource_usage() == ExecutionResources() - assert op.incremental_resource_usage() == ExecutionResources(cpu=1, gpu=0) + data_context = ray.data.DataContext.get_current() + inc_obj_store_mem = ( + data_context._max_num_blocks_in_streaming_gen_buffer + * data_context.target_max_block_size + ) + assert op.incremental_resource_usage() == ExecutionResources( + cpu=1, + gpu=0, + object_store_memory=inc_obj_store_mem, + ) assert op._ray_remote_args == {"num_cpus": 1} op = MapOperator.create( @@ -66,7 +75,9 @@ def test_resource_canonicalization(ray_start_10_cpus_shared): ray_remote_args={"num_gpus": 2}, ) assert op.base_resource_usage() == ExecutionResources() - assert op.incremental_resource_usage() == ExecutionResources(cpu=0, gpu=2) + assert op.incremental_resource_usage() == ExecutionResources( + cpu=0, gpu=2, object_store_memory=inc_obj_store_mem + ) assert op._ray_remote_args == {"num_gpus": 2} with pytest.raises(ValueError): @@ -204,10 +215,17 @@ def test_actor_pool_resource_reporting(ray_start_10_cpus_shared, restore_data_co ) op.start(ExecutionOptions()) + data_context = ray.data.DataContext.get_current() + inc_obj_store_mem = ( + data_context._max_num_blocks_in_streaming_gen_buffer + * data_context.target_max_block_size + ) assert op.base_resource_usage() == ExecutionResources(cpu=2, gpu=0) # All actors are idle (pending creation), therefore shouldn't need to scale up when # submitting a new task, so incremental resource usage should be 0. - assert op.incremental_resource_usage() == ExecutionResources(cpu=0, gpu=0) + assert op.incremental_resource_usage() == ExecutionResources( + cpu=0, gpu=0, object_store_memory=inc_obj_store_mem + ) assert op.current_processor_usage() == ExecutionResources(cpu=2, gpu=0) assert op.metrics.obj_store_mem_internal_inqueue == 0 assert op.metrics.obj_store_mem_internal_outqueue == 0 @@ -218,7 +236,9 @@ def test_actor_pool_resource_reporting(ray_start_10_cpus_shared, restore_data_co for i in range(4): # Pool is still idle while waiting for actors to start, so additional tasks # shouldn't trigger scale-up, so incremental resource usage should still be 0. - assert op.incremental_resource_usage() == ExecutionResources(cpu=0, gpu=0) + assert op.incremental_resource_usage() == ExecutionResources( + cpu=0, gpu=0, object_store_memory=inc_obj_store_mem + ) op.add_input(input_op.get_next(), 0) assert op.current_processor_usage() == ExecutionResources(cpu=2, gpu=0) assert op.metrics.obj_store_mem_internal_inqueue == pytest.approx( @@ -290,10 +310,17 @@ def test_actor_pool_resource_reporting_with_bundling(ray_start_10_cpus_shared): ) op.start(ExecutionOptions()) + data_context = ray.data.DataContext.get_current() + inc_obj_store_mem = ( + data_context._max_num_blocks_in_streaming_gen_buffer + * data_context.target_max_block_size + ) assert op.base_resource_usage() == ExecutionResources(cpu=2, gpu=0) # All actors are idle (pending creation), therefore shouldn't need to scale up when # submitting a new task, so incremental resource usage should be 0. - assert op.incremental_resource_usage() == ExecutionResources(cpu=0, gpu=0) + assert op.incremental_resource_usage() == ExecutionResources( + cpu=0, gpu=0, object_store_memory=inc_obj_store_mem + ) assert op.current_processor_usage() == ExecutionResources(cpu=2, gpu=0) assert op.metrics.obj_store_mem_internal_inqueue == 0 assert op.metrics.obj_store_mem_internal_outqueue == 0 @@ -304,7 +331,9 @@ def test_actor_pool_resource_reporting_with_bundling(ray_start_10_cpus_shared): for i in range(4): # Pool is still idle while waiting for actors to start, so additional tasks # shouldn't trigger scale-up, so incremental resource usage should still be 0. - assert op.incremental_resource_usage() == ExecutionResources(cpu=0, gpu=0) + assert op.incremental_resource_usage() == ExecutionResources( + cpu=0, gpu=0, object_store_memory=inc_obj_store_mem + ) op.add_input(input_op.get_next(), 0) assert op.current_processor_usage() == ExecutionResources(cpu=2, gpu=0) assert op.metrics.obj_store_mem_internal_inqueue == pytest.approx( diff --git a/python/ray/data/tests/test_resource_manager.py b/python/ray/data/tests/test_resource_manager.py index 942c7094b3e0..657985a3b706 100644 --- a/python/ray/data/tests/test_resource_manager.py +++ b/python/ray/data/tests/test_resource_manager.py @@ -14,7 +14,7 @@ from ray.data._internal.execution.operators.map_operator import MapOperator from ray.data._internal.execution.operators.union_operator import UnionOperator from ray.data._internal.execution.resource_manager import ( - ReservationOpResourceLimiter, + ReservationOpResourceAllocator, ResourceManager, ) from ray.data._internal.execution.streaming_executor_state import ( @@ -23,7 +23,26 @@ from ray.data._internal.execution.util import make_ref_bundles from ray.data.context import DataContext from ray.data.tests.conftest import * # noqa -from ray.data.tests.test_streaming_executor import make_map_transformer + + +def mock_map_op( + input_op, + ray_remote_args=None, + compute_strategy=None, + incremental_resource_usage=None, +): + op = MapOperator.create( + MagicMock(), + input_op, + ray_remote_args=ray_remote_args or {}, + compute_strategy=compute_strategy, + ) + op.start = MagicMock(side_effect=lambda _: None) + if incremental_resource_usage is not None: + op.incremental_resource_usage = MagicMock( + return_value=incremental_resource_usage + ) + return op class TestResourceManager: @@ -111,50 +130,108 @@ def test_global_limits_cache(self): assert resource_manager.get_global_limits() == expected_resource assert ray_cluster_resources.call_count == 2 - def test_calculating_usage(self): - inputs = make_ref_bundles([[x] for x in range(20)]) - o1 = InputDataBuffer(inputs) - o2 = MapOperator.create( - make_map_transformer(lambda block: [b * -1 for b in block]), o1 - ) - o3 = MapOperator.create( - make_map_transformer(lambda block: [b * 2 for b in block]), o2 - ) - o2.current_processor_usage = MagicMock( - return_value=ExecutionResources(cpu=5, gpu=0) - ) - o2.metrics.obj_store_mem_internal_outqueue = 500 - o3.current_processor_usage = MagicMock( - return_value=ExecutionResources(cpu=10, gpu=0) - ) - o3.metrics.obj_store_mem_internal_outqueue = 1000 + def test_update_usage(self): + """Test calculating op_usage.""" + o1 = InputDataBuffer([]) + o2 = mock_map_op(o1) + o3 = mock_map_op(o2) topo, _ = build_streaming_topology(o3, ExecutionOptions()) - inputs[0].size_bytes = MagicMock(return_value=200) - topo[o2].add_output(inputs[0]) + + # Mock different metrics that contribute to the resource usage. + mock_cpu = { + o1: 0, + o2: 5, + o3: 8, + } + mock_pending_task_outputs = { + o1: 0, + o2: 100, + o3: 200, + } + mock_internal_outqueue = { + o1: 0, + o2: 300, + o3: 400, + } + mock_external_outqueue_sizes = { + o1: 100, + o2: 500, + o3: 600, + } + mock_internal_inqueue = { + o1: 0, + o2: 700, + o3: 800, + } + mock_pending_task_inputs = { + o1: 0, + o2: 900, + o3: 1000, + } + + for op in [o1, o2, o3]: + op.current_processor_usage = MagicMock( + return_value=ExecutionResources(cpu=mock_cpu[op], gpu=0) + ) + op._metrics = MagicMock( + obj_store_mem_pending_task_outputs=mock_pending_task_outputs[op], + obj_store_mem_internal_outqueue=mock_internal_outqueue[op], + obj_store_mem_internal_inqueue=mock_internal_inqueue[op], + obj_store_mem_pending_task_inputs=mock_pending_task_inputs[op], + ) + ref_bundle = MagicMock( + size_bytes=MagicMock(return_value=mock_external_outqueue_sizes[op]) + ) + topo[op].add_output(ref_bundle) resource_manager = ResourceManager(topo, ExecutionOptions()) + resource_manager._op_resource_allocator = None resource_manager.update_usages() - assert resource_manager.get_global_usage() == ExecutionResources(15, 0, 1700) - - assert resource_manager.get_op_usage(o1) == ExecutionResources(0, 0, 0) - assert resource_manager.get_op_usage(o2) == ExecutionResources(5, 0, 700) - assert resource_manager.get_op_usage(o3) == ExecutionResources(10, 0, 1000) - - assert resource_manager.get_downstream_fraction(o1) == 1.0 - assert resource_manager.get_downstream_fraction(o2) == 1.0 - assert resource_manager.get_downstream_fraction(o3) == 0.5 - assert resource_manager.get_downstream_object_store_memory(o1) == 1700 - assert resource_manager.get_downstream_object_store_memory(o2) == 1700 - assert resource_manager.get_downstream_object_store_memory(o3) == 1000 + global_cpu = 0 + global_mem = 0 + for op in [o1, o2, o3]: + if op == o1: + # Resource usage of InputDataBuffer doesn't count. + expected_mem = 0 + else: + expected_mem = ( + mock_pending_task_outputs[op] + + mock_internal_outqueue[op] + + mock_external_outqueue_sizes[op] + ) + for next_op in op.output_dependencies: + expected_mem += ( + +mock_internal_inqueue[next_op] + + mock_pending_task_inputs[next_op] + ) + op_usage = resource_manager.get_op_usage(op) + assert op_usage.cpu == mock_cpu[op] + assert op_usage.gpu == 0 + assert op_usage.object_store_memory == expected_mem + if op != o1: + assert ( + resource_manager._mem_pending_task_outputs[op] + == mock_pending_task_outputs[op] + ) + assert ( + resource_manager._mem_op_outputs[op] + == expected_mem - mock_pending_task_outputs[op] + ) + global_cpu += mock_cpu[op] + global_mem += expected_mem + + assert resource_manager.get_global_usage() == ExecutionResources( + global_cpu, 0, global_mem + ) def test_object_store_usage(self, restore_data_context): input = make_ref_bundles([[x] for x in range(1)])[0] input.size_bytes = MagicMock(return_value=1) o1 = InputDataBuffer([input]) - o2 = MapOperator.create(MagicMock(), o1) - o3 = MapOperator.create(MagicMock(), o2) + o2 = mock_map_op(o1) + o3 = mock_map_op(o2) topo, _ = build_streaming_topology(o3, ExecutionOptions()) resource_manager = ResourceManager(topo, ExecutionOptions()) @@ -229,19 +306,21 @@ def test_object_store_usage(self, restore_data_context): assert resource_manager.get_op_usage(o3).object_store_memory == 1 -class TestReservationOpResourceLimiter: - """Tests for ReservationOpResourceLimiter.""" +class TestReservationOpResourceAllocator: + """Tests for ReservationOpResourceAllocator.""" def test_basic(self, restore_data_context): DataContext.get_current().op_resource_reservation_enabled = True DataContext.get_current().op_resource_reservation_ratio = 0.5 o1 = InputDataBuffer([]) - o2 = MapOperator.create(MagicMock(), o1) - o3 = MapOperator.create(MagicMock(), o2) + o2 = mock_map_op(o1, incremental_resource_usage=ExecutionResources(1, 0, 15)) + o3 = mock_map_op(o2, incremental_resource_usage=ExecutionResources(1, 0, 10)) o4 = LimitOperator(1, o3) op_usages = {op: ExecutionResources.zero() for op in [o1, o2, o3, o4]} + pending_task_outputs_usages = {op: 0 for op in [o1, o2, o3, o4]} + op_outputs_usages = {op: 0 for op in [o1, o2, o3, o4]} topo, _ = build_streaming_topology(o4, ExecutionOptions()) @@ -253,93 +332,129 @@ def mock_get_global_limits(): resource_manager = ResourceManager(topo, ExecutionOptions()) resource_manager.get_op_usage = MagicMock(side_effect=lambda op: op_usages[op]) + resource_manager._mem_pending_task_outputs = pending_task_outputs_usages + resource_manager._mem_op_outputs = op_outputs_usages + resource_manager.get_global_limits = MagicMock( side_effect=mock_get_global_limits ) - assert resource_manager.op_resource_limiter_enabled() - op_resource_limiter = resource_manager._op_resource_limiter - assert isinstance(op_resource_limiter, ReservationOpResourceLimiter) + assert resource_manager.op_resource_allocator_enabled() + allocator = resource_manager._op_resource_allocator + assert isinstance(allocator, ReservationOpResourceAllocator) # Test initial state when no resources are used. global_limits = ExecutionResources(cpu=16, gpu=0, object_store_memory=1000) - op_resource_limiter.update_usages() - assert o1 not in op_resource_limiter._op_reserved - assert o4 not in op_resource_limiter._op_reserved - assert op_resource_limiter._op_reserved[o2] == ExecutionResources(4, 0, 250) - assert op_resource_limiter._op_reserved[o3] == ExecutionResources(4, 0, 250) - assert op_resource_limiter._total_shared == ExecutionResources(8, 0, 500) - - assert op_resource_limiter.get_op_limits(o1) == ExecutionResources.inf() - assert op_resource_limiter.get_op_limits(o4) == ExecutionResources.inf() - assert op_resource_limiter.get_op_limits(o2) == ExecutionResources( - 8, float("inf"), 500 - ) - assert op_resource_limiter.get_op_limits(o3) == ExecutionResources( - 8, float("inf"), 500 - ) + allocator.update_usages() + # +-----+------------------+------------------+--------------+ + # | | _op_reserved | _reserved_for | used shared | + # | | (used/remaining) | _op_outputs | resources | + # | | | (used/remaining) | | + # +-----+------------------+------------------+--------------+ + # | op2 | 0/125 | 0/125 | 0 | + # +-----+------------------+------------------+--------------+ + # | op3 | 0/125 | 0/125 | 0 | + # +-----+------------------+------------------+--------------+ + # o1 and o4 are not handled. + assert o1 not in allocator._op_reserved + assert o4 not in allocator._op_reserved + assert o1 not in allocator._op_budgets + assert o4 not in allocator._op_budgets + # Test reserved resources for o2 and o3. + assert allocator._op_reserved[o2] == ExecutionResources(4, 0, 125) + assert allocator._op_reserved[o3] == ExecutionResources(4, 0, 125) + assert allocator._reserved_for_op_outputs[o2] == 125 + assert allocator._reserved_for_op_outputs[o3] == 125 + # 50% of the global limits are shared. + assert allocator._total_shared == ExecutionResources(8, 0, 500) + # Test budgets. + assert allocator._op_budgets[o2] == ExecutionResources(8, float("inf"), 375) + assert allocator._op_budgets[o3] == ExecutionResources(8, float("inf"), 375) + # Test can_submit_new_task and max_task_output_bytes_to_read. + assert allocator.can_submit_new_task(o2) + assert allocator.can_submit_new_task(o3) + assert allocator.max_task_output_bytes_to_read(o2) == 500 + assert allocator.max_task_output_bytes_to_read(o3) == 500 # Test when each operator uses some resources. op_usages[o2] = ExecutionResources(6, 0, 500) + pending_task_outputs_usages[o2] = 400 + op_outputs_usages[o2] = 100 op_usages[o3] = ExecutionResources(2, 0, 125) + pending_task_outputs_usages[o3] = 30 + op_outputs_usages[o3] = 25 op_usages[o4] = ExecutionResources(0, 0, 50) - op_resource_limiter.update_usages() - assert op_resource_limiter.get_op_limits(o1) == ExecutionResources.inf() - assert op_resource_limiter.get_op_limits(o4) == ExecutionResources.inf() - assert op_resource_limiter.get_op_limits(o2) == ExecutionResources( - 3, float("inf"), 100 - ) - assert op_resource_limiter.get_op_limits(o3) == ExecutionResources( - 5, float("inf"), 225 - ) + allocator.update_usages() + # +-----+------------------+------------------+--------------+ + # | | _op_reserved | _reserved_for | used shared | + # | | (used/remaining) | _op_outputs | resources | + # | | | (used/remaining) | | + # +-----+------------------+------------------+--------------+ + # | op2 | 125/0 | 100/25 | 400-125=275 | + # +-----+------------------+------------------+--------------+ + # | op3 | (30+50)/45 | 25/100 | 0 | + # +-----+------------------+------------------+--------------+ + # remaining shared = 1000/2 - 275 = 225 + # Test budgets. + # memory_budget[o2] = 0 + 225/2 = 112.5 + assert allocator._op_budgets[o2] == ExecutionResources(3, float("inf"), 112.5) + # memory_budget[o3] = 45 + 225/2 = 157.5 + assert allocator._op_budgets[o3] == ExecutionResources(5, float("inf"), 157.5) + # Test can_submit_new_task and max_task_output_bytes_to_read. + assert allocator.can_submit_new_task(o2) + assert allocator.can_submit_new_task(o3) + # max_task_output_bytes_to_read(o2) = 112.5 + 25 = 137.5 + assert allocator.max_task_output_bytes_to_read(o2) == 137.5 + # max_task_output_bytes_to_read(o3) = 157.5 + 100 = 257.5 + assert allocator.max_task_output_bytes_to_read(o3) == 257.5 # Test global_limits updated. global_limits = ExecutionResources(cpu=12, gpu=0, object_store_memory=800) - op_resource_limiter.update_usages() - assert o1 not in op_resource_limiter._op_reserved - assert o4 not in op_resource_limiter._op_reserved - assert op_resource_limiter._op_reserved[o2] == ExecutionResources(3, 0, 200) - assert op_resource_limiter._op_reserved[o3] == ExecutionResources(3, 0, 200) - assert op_resource_limiter._total_shared == ExecutionResources(6, 0, 400) - - assert op_resource_limiter.get_op_limits(o1) == ExecutionResources.inf() - assert op_resource_limiter.get_op_limits(o4) == ExecutionResources.inf() - assert op_resource_limiter.get_op_limits(o2) == ExecutionResources( - 1.5, float("inf"), 25 - ) - assert op_resource_limiter.get_op_limits(o3) == ExecutionResources( - 2.5, float("inf"), 100 - ) - - # Test global_limits exceeded. - op_usages[o4] = ExecutionResources(0, 0, 150) - op_resource_limiter.update_usages() - assert op_resource_limiter.get_op_limits(o1) == ExecutionResources.inf() - assert op_resource_limiter.get_op_limits(o4) == ExecutionResources.inf() - assert op_resource_limiter.get_op_limits(o2) == ExecutionResources( - 1.5, float("inf"), 0 - ) - # o3 still has object_store_memory in its reserved resources, - # even if the global limits are already exceeded. - assert op_resource_limiter.get_op_limits(o3) == ExecutionResources( - 2.5, float("inf"), 75 - ) - - def test_reserve_at_least_incremental_resource_usage(self, restore_data_context): + allocator.update_usages() + # +-----+------------------+------------------+--------------+ + # | | _op_reserved | _reserved_for | used shared | + # | | (used/remaining) | _op_outputs | resources | + # | | | (used/remaining) | | + # +-----+------------------+------------------+--------------+ + # | op2 | 100/0 | 100/0 | 400-100=300 | + # +-----+------------------+------------------+--------------+ + # | op3 | (30+50)/20 | 25/75 | 0 | + # +-----+------------------+------------------+--------------+ + # remaining shared = 800/2 - 300 = 100 + # Test reserved resources for o2 and o3. + assert allocator._op_reserved[o2] == ExecutionResources(3, 0, 100) + assert allocator._op_reserved[o3] == ExecutionResources(3, 0, 100) + assert allocator._reserved_for_op_outputs[o2] == 100 + assert allocator._reserved_for_op_outputs[o3] == 100 + # 50% of the global limits are shared. + assert allocator._total_shared == ExecutionResources(6, 0, 400) + + # Test budgets. + # memory_budget[o2] = 0 + 100/2 = 50 + assert allocator._op_budgets[o2] == ExecutionResources(1.5, float("inf"), 50) + # memory_budget[o3] = 20 + 100/2 = 70 + assert allocator._op_budgets[o3] == ExecutionResources(2.5, float("inf"), 70) + # Test can_submit_new_task and max_task_output_bytes_to_read. + assert allocator.can_submit_new_task(o2) + assert allocator.can_submit_new_task(o3) + # max_task_output_bytes_to_read(o2) = 50 + 0 = 50 + assert allocator.max_task_output_bytes_to_read(o2) == 50 + # max_task_output_bytes_to_read(o3) = 70 + 75 = 145 + assert allocator.max_task_output_bytes_to_read(o3) == 145 + + def test_reserve_incremental_resource_usage(self, restore_data_context): """Test that we'll reserve at least incremental_resource_usage() for each operator.""" DataContext.get_current().op_resource_reservation_enabled = True DataContext.get_current().op_resource_reservation_ratio = 0.5 - global_limits = ExecutionResources(cpu=4, gpu=0, object_store_memory=1000) - incremental_usage = ExecutionResources(cpu=3, gpu=0, object_store_memory=600) + global_limits = ExecutionResources(cpu=6, gpu=0, object_store_memory=1600) + incremental_usage = ExecutionResources(cpu=3, gpu=0, object_store_memory=500) o1 = InputDataBuffer([]) - o2 = MapOperator.create(MagicMock(), o1) - o2.incremental_resource_usage = MagicMock(return_value=incremental_usage) - o3 = MapOperator.create(MagicMock(), o2) - o3.incremental_resource_usage = MagicMock(return_value=incremental_usage) + o2 = mock_map_op(o1, incremental_resource_usage=incremental_usage) + o3 = mock_map_op(o2, incremental_resource_usage=incremental_usage) topo, _ = build_streaming_topology(o3, ExecutionOptions()) resource_manager = ResourceManager(topo, ExecutionOptions()) @@ -348,26 +463,53 @@ def test_reserve_at_least_incremental_resource_usage(self, restore_data_context) ) resource_manager.get_global_limits = MagicMock(return_value=global_limits) - op_resource_limiter = resource_manager._op_resource_limiter - assert isinstance(op_resource_limiter, ReservationOpResourceLimiter) + allocator = resource_manager._op_resource_allocator + assert isinstance(allocator, ReservationOpResourceAllocator) - op_resource_limiter.update_usages() - assert op_resource_limiter._op_reserved[o2] == incremental_usage - assert op_resource_limiter._op_reserved[o3] == incremental_usage + allocator.update_usages() + assert allocator._op_reserved[o2] == incremental_usage + assert allocator._op_reserved[o3] == incremental_usage + assert allocator._reserved_for_op_outputs[o2] == 200 + assert allocator._reserved_for_op_outputs[o2] == 200 - assert op_resource_limiter.get_op_limits(o2) == ExecutionResources( - 4, float("inf"), 850 + def test_reserve_min_resources_for_gpu_ops(self, restore_data_context): + """Test that we'll reserve enough resources for ActorPoolMapOperator + that uses GPU.""" + DataContext.get_current().op_resource_reservation_enabled = True + DataContext.get_current().op_resource_reservation_ratio = 0.5 + + global_limits = ExecutionResources(cpu=6, gpu=0, object_store_memory=1600) + incremental_usage = ExecutionResources(cpu=0, gpu=0, object_store_memory=100) + + o1 = InputDataBuffer([]) + o2 = mock_map_op( + o1, + ray_remote_args={"num_cpus": 0, "num_gpus": 1}, + compute_strategy=ray.data.ActorPoolStrategy(size=8), + incremental_resource_usage=incremental_usage, ) - assert op_resource_limiter.get_op_limits(o3) == ExecutionResources( - 4, float("inf"), 850 + topo, _ = build_streaming_topology(o2, ExecutionOptions()) + + resource_manager = ResourceManager(topo, ExecutionOptions()) + resource_manager.get_op_usage = MagicMock( + return_value=ExecutionResources.zero() ) + resource_manager.get_global_limits = MagicMock(return_value=global_limits) - def test_no_eligible_ops(self, restore_data_context): + allocator = resource_manager._op_resource_allocator + assert isinstance(allocator, ReservationOpResourceAllocator) + + allocator.update_usages() + assert allocator._op_reserved[o2].object_store_memory == 800 + + def test_only_handle_eligible_ops(self, restore_data_context): + """Test that we only handle non-completed map ops.""" DataContext.get_current().op_resource_reservation_enabled = True o1 = InputDataBuffer([]) - o2 = LimitOperator(1, o1) - topo, _ = build_streaming_topology(o2, ExecutionOptions()) + o2 = mock_map_op(o1) + o3 = LimitOperator(1, o2) + topo, _ = build_streaming_topology(o3, ExecutionOptions()) resource_manager = ResourceManager(topo, ExecutionOptions()) resource_manager.get_op_usage = MagicMock( @@ -377,31 +519,36 @@ def test_no_eligible_ops(self, restore_data_context): return_value=ExecutionResources.zero() ) - assert resource_manager.op_resource_limiter_enabled() - op_resource_limiter = resource_manager._op_resource_limiter - assert isinstance(op_resource_limiter, ReservationOpResourceLimiter) + assert resource_manager.op_resource_allocator_enabled() + allocator = resource_manager._op_resource_allocator + assert isinstance(allocator, ReservationOpResourceAllocator) + + allocator.update_usages() + assert o2 in allocator._op_budgets + assert o3 not in allocator._op_budgets - op_resource_limiter.update_usages() - assert op_resource_limiter.get_op_limits(o1) == ExecutionResources.inf() + o2.completed = MagicMock(return_value=True) + allocator.update_usages() + assert o2 not in allocator._op_budgets def test_only_enable_for_ops_with_accurate_memory_accouting( self, restore_data_context ): - """Test that ReservationOpResourceLimiter is not enabled when + """Test that ReservationOpResourceAllocator is not enabled when there are ops not in ResourceManager._ACCURRATE_MEMORY_ACCOUNTING_OPS """ DataContext.get_current().op_resource_reservation_enabled = True o1 = InputDataBuffer([]) - o2 = MapOperator.create(MagicMock(), o1) + o2 = mock_map_op(o1) o3 = InputDataBuffer([]) - o4 = MapOperator.create(MagicMock(), o3) + o4 = mock_map_op(o3) o3 = UnionOperator(o2, o4) topo, _ = build_streaming_topology(o3, ExecutionOptions()) resource_manager = ResourceManager(topo, ExecutionOptions()) - assert not resource_manager.op_resource_limiter_enabled() + assert not resource_manager.op_resource_allocator_enabled() if __name__ == "__main__": diff --git a/python/ray/data/tests/test_runtime_metrics_scheduling.py b/python/ray/data/tests/test_runtime_metrics_scheduling.py deleted file mode 100644 index 12df40a5ffd9..000000000000 --- a/python/ray/data/tests/test_runtime_metrics_scheduling.py +++ /dev/null @@ -1,43 +0,0 @@ -import time - -import numpy as np -import pytest - -import ray -from ray._private.internal_api import memory_summary -from ray.data._internal.execution.backpressure_policy import ( - ENABLED_BACKPRESSURE_POLICIES_CONFIG_KEY, - StreamingOutputBackpressurePolicy, -) - - -def test_scheduler_accounts_for_in_flight_tasks(shutdown_only, restore_data_context): - # The executor launches multiple tasks in each scheduling step. If it doesn't - # account for the potential output of in flight tasks, it may launch too many tasks - # and cause spilling. - ctx = ray.init(object_store_memory=100 * 1024**2) - - ray.data.DataContext.get_current().use_runtime_metrics_scheduling = True - ray.data.DataContext.get_current().set_config( - ENABLED_BACKPRESSURE_POLICIES_CONFIG_KEY, [StreamingOutputBackpressurePolicy] - ) - - def f(batch): - time.sleep(0.1) - return {"data": np.zeros(24 * 1024**2, dtype=np.uint8)} - - # If the executor doesn't account for the potential output of in flight tasks, it - # will launch all 8 tasks at once, producing 8 * 24MiB = 192MiB > 100MiB of data. - ds = ray.data.range(8, parallelism=8).map_batches(f, batch_size=None) - - for _ in ds.iter_batches(batch_size=None, batch_format="pyarrow"): - pass - - meminfo = memory_summary(ctx.address_info["address"], stats_only=True) - assert "Spilled" not in meminfo, meminfo - - -if __name__ == "__main__": - import sys - - sys.exit(pytest.main(["-v", __file__])) diff --git a/python/ray/data/tests/test_size_estimation.py b/python/ray/data/tests/test_size_estimation.py index d6fd9ee03bc0..06795caa8305 100644 --- a/python/ray/data/tests/test_size_estimation.py +++ b/python/ray/data/tests/test_size_estimation.py @@ -109,8 +109,8 @@ def gen(name): assert 80 < x < 120, (x, nrow) # Disabled. - # Setting infinite block size effectively disables block splitting. - ctx.target_max_block_size = float("inf") + # Setting a huge block size effectively disables block splitting. + ctx.target_max_block_size = 2**64 ds4 = gen("out4") assert ds4._block_num_rows() == [1000] @@ -194,8 +194,8 @@ def __call__(self, x): assert 4 < nblocks < 7 or use_actors, nblocks # Disabled. - # Setting infinite block size effectively disables block splitting. - ctx.target_max_block_size = float("inf") + # Setting a huge block size effectively disables block splitting. + ctx.target_max_block_size = 2**64 ds3 = ray.data.range(1000, parallelism=1).map(arrow_fn, **kwargs) nblocks = len(ds3.map(identity_fn, **kwargs).get_internal_block_refs()) assert nblocks == 1, nblocks diff --git a/python/ray/data/tests/test_streaming_backpressure_edge_case.py b/python/ray/data/tests/test_streaming_backpressure_edge_case.py deleted file mode 100644 index 223d1a1c4a0f..000000000000 --- a/python/ray/data/tests/test_streaming_backpressure_edge_case.py +++ /dev/null @@ -1,132 +0,0 @@ -import time - -import numpy as np -import pandas as pd -import pytest - -import ray -from ray._private.internal_api import memory_summary -from ray.data._internal.execution.backpressure_policy import ( - ENABLED_BACKPRESSURE_POLICIES_CONFIG_KEY, - StreamingOutputBackpressurePolicy, -) -from ray.data.block import BlockMetadata -from ray.data.datasource import Datasource, ReadTask -from ray.data.tests.conftest import * # noqa -from ray.tests.conftest import * # noqa - - -def test_input_backpressure_e2e(restore_data_context, shutdown_only): - - # Tests that backpressure applies even when reading directly from the input - # datasource. This relies on datasource metadata size estimation. - @ray.remote - class Counter: - def __init__(self): - self.count = 0 - - def increment(self): - self.count += 1 - - def get(self): - return self.count - - def reset(self): - self.count = 0 - - class CountingRangeDatasource(Datasource): - def __init__(self): - self.counter = Counter.remote() - - def prepare_read(self, parallelism, n): - def range_(i): - ray.get(self.counter.increment.remote()) - return [ - pd.DataFrame({"data": np.ones((n // parallelism * 1024 * 1024,))}) - ] - - sz = (n // parallelism) * 1024 * 1024 * 8 - print("Block size", sz) - - return [ - ReadTask( - lambda i=i: range_(i), - BlockMetadata( - num_rows=n // parallelism, - size_bytes=sz, - schema=None, - input_files=None, - exec_stats=None, - ), - ) - for i in range(parallelism) - ] - - source = CountingRangeDatasource() - ctx = ray.data.DataContext.get_current() - ctx.execution_options.resource_limits.object_store_memory = 10e6 - ctx.set_config( - ENABLED_BACKPRESSURE_POLICIES_CONFIG_KEY, - [ - StreamingOutputBackpressurePolicy, - ], - ) - - # 10GiB dataset. - ds = ray.data.read_datasource(source, n=10000, parallelism=1000) - it = iter(ds.iter_batches(batch_size=None, prefetch_batches=0)) - next(it) - time.sleep(3) - launched = ray.get(source.counter.get.remote()) - - # If backpressure is broken we'll launch 15+. - assert launched < 5, launched - - -def test_streaming_backpressure_e2e(restore_data_context): - - # This test case is particularly challenging since there is a large input->output - # increase in data size: https://github.com/ray-project/ray/issues/34041 - class TestSlow: - def __call__(self, df: np.ndarray): - time.sleep(2) - return {"id": np.random.randn(1, 20, 1024, 1024)} - - class TestFast: - def __call__(self, df: np.ndarray): - time.sleep(0.5) - return {"id": np.random.randn(1, 20, 1024, 1024)} - - ctx = ray.init(object_store_memory=4e9) - data_context = ray.data.DataContext.get_current() - data_context.set_config( - ENABLED_BACKPRESSURE_POLICIES_CONFIG_KEY, - [ - StreamingOutputBackpressurePolicy, - ], - ) - ds = ray.data.range_tensor(20, shape=(3, 1024, 1024), parallelism=20) - - pipe = ds.map_batches( - TestFast, - batch_size=1, - num_cpus=0.5, - compute=ray.data.ActorPoolStrategy(size=2), - ).map_batches( - TestSlow, - batch_size=1, - compute=ray.data.ActorPoolStrategy(size=1), - ) - - for batch in pipe.iter_batches(batch_size=1, prefetch_batches=2): - ... - - # If backpressure is not working right, we will spill. - meminfo = memory_summary(ctx.address_info["address"], stats_only=True) - assert "Spilled" not in meminfo, meminfo - - -if __name__ == "__main__": - import sys - - sys.exit(pytest.main(["-v", __file__])) diff --git a/python/ray/data/tests/test_streaming_executor.py b/python/ray/data/tests/test_streaming_executor.py index 8111568044de..8cbe743f8ec4 100644 --- a/python/ray/data/tests/test_streaming_executor.py +++ b/python/ray/data/tests/test_streaming_executor.py @@ -54,6 +54,7 @@ def mock_resource_manager( get_downstream_object_store_memory=MagicMock( return_value=downstream_object_store_memory ), + op_resource_allocator_enabled=MagicMock(return_value=False), ) @@ -127,7 +128,8 @@ def test_process_completed_tasks(): # Test processing output bundles. assert len(topo[o1].outqueue) == 0, topo - process_completed_tasks(topo, [], 0) + resource_manager = mock_resource_manager() + process_completed_tasks(topo, resource_manager, 0) update_operator_states(topo) assert len(topo[o1].outqueue) == 20, topo @@ -138,7 +140,7 @@ def test_process_completed_tasks(): o2.get_active_tasks = MagicMock(return_value=[sleep_task, done_task]) o2.all_inputs_done = MagicMock() o1.mark_execution_completed = MagicMock() - process_completed_tasks(topo, [], 0) + process_completed_tasks(topo, resource_manager, 0) update_operator_states(topo) done_task_callback.assert_called_once() o2.all_inputs_done.assert_not_called() @@ -152,7 +154,7 @@ def test_process_completed_tasks(): o1.mark_execution_completed = MagicMock() o1.completed = MagicMock(return_value=True) topo[o1].outqueue.clear() - process_completed_tasks(topo, [], 0) + process_completed_tasks(topo, resource_manager, 0) update_operator_states(topo) done_task_callback.assert_called_once() o2.all_inputs_done.assert_called_once() @@ -170,7 +172,7 @@ def test_process_completed_tasks(): o3.mark_execution_completed() o2.mark_execution_completed = MagicMock() - process_completed_tasks(topo, [], 0) + process_completed_tasks(topo, resource_manager, 0) update_operator_states(topo) o2.mark_execution_completed.assert_called_once() @@ -189,6 +191,14 @@ def test_select_operator_to_run(): resource_manager = mock_resource_manager( global_limits=ExecutionResources(1, 1, 1), ) + memory_usage = { + o1: 0, + o2: 0, + o3: 0, + } + resource_manager.get_op_usage = MagicMock( + side_effect=lambda op: ExecutionResources(0, 0, memory_usage[op]) + ) # Test empty. assert ( @@ -198,8 +208,9 @@ def test_select_operator_to_run(): is None ) - # Test backpressure based on queue length between operators. + # Test backpressure based on memory_usage of each operator. topo[o1].outqueue.append(make_ref_bundle("dummy1")) + memory_usage[o1] += 1 assert ( select_operator_to_run( topo, resource_manager, [], True, "dummy", AutoscalingState() @@ -207,6 +218,7 @@ def test_select_operator_to_run(): == o2 ) topo[o1].outqueue.append(make_ref_bundle("dummy2")) + memory_usage[o1] += 1 assert ( select_operator_to_run( topo, resource_manager, [], True, "dummy", AutoscalingState() @@ -214,41 +226,7 @@ def test_select_operator_to_run(): == o2 ) topo[o2].outqueue.append(make_ref_bundle("dummy3")) - assert ( - select_operator_to_run( - topo, resource_manager, [], True, "dummy", AutoscalingState() - ) - == o3 - ) - - # Test backpressure includes num active tasks as well. - o3.num_active_tasks = MagicMock(return_value=2) - o3.internal_queue_size = MagicMock(return_value=0) - assert ( - select_operator_to_run( - topo, resource_manager, [], True, "dummy", AutoscalingState() - ) - == o2 - ) - # Internal queue size is added to num active tasks. - o3.num_active_tasks = MagicMock(return_value=0) - o3.internal_queue_size = MagicMock(return_value=2) - assert ( - select_operator_to_run( - topo, resource_manager, [], True, "dummy", AutoscalingState() - ) - == o2 - ) - o2.num_active_tasks = MagicMock(return_value=2) - o2.internal_queue_size = MagicMock(return_value=0) - assert ( - select_operator_to_run( - topo, resource_manager, [], True, "dummy", AutoscalingState() - ) - == o3 - ) - o2.num_active_tasks = MagicMock(return_value=0) - o2.internal_queue_size = MagicMock(return_value=2) + memory_usage[o2] += 1 assert ( select_operator_to_run( topo, resource_manager, [], True, "dummy", AutoscalingState() diff --git a/release/air_tests/air_benchmarks/mlperf-train/resnet50_ray_air.py b/release/air_tests/air_benchmarks/mlperf-train/resnet50_ray_air.py index cd1b4f3acb00..1e01fbdc77bb 100644 --- a/release/air_tests/air_benchmarks/mlperf-train/resnet50_ray_air.py +++ b/release/air_tests/air_benchmarks/mlperf-train/resnet50_ray_air.py @@ -565,6 +565,8 @@ def append_to_test_output_json(path, metrics): ctx = ray.data.context.DataContext.get_current() options.resource_limits.object_store_memory = 10e9 + # Disable resource reservation for maximum throughput. + ctx.op_resource_reservation_ratio = 0 datasets["train"] = build_dataset( args.data_root,