From ef7907c237e4b8a37e68283210a2afed32e9e961 Mon Sep 17 00:00:00 2001 From: Scott Lee Date: Tue, 10 Jan 2023 14:09:35 -0800 Subject: [PATCH 1/7] initial change with WIP test Signed-off-by: Scott Lee --- python/ray/data/_internal/plan.py | 10 ++++++++-- python/ray/data/tests/test_dataset.py | 22 ++++++++++++++++++++++ 2 files changed, 30 insertions(+), 2 deletions(-) diff --git a/python/ray/data/_internal/plan.py b/python/ray/data/_internal/plan.py index 165d7abea374..27167e239299 100644 --- a/python/ray/data/_internal/plan.py +++ b/python/ray/data/_internal/plan.py @@ -332,9 +332,15 @@ def execute( Returns: The blocks of the output dataset. """ + context = DatasetContext.get_current() + if ray.available_resources().get("CPU", None) is None: + logger.get_logger(log_to_stdout=context.enable_auto_log_stats).warning( + "Warning: The Ray cluster currently does not have " + "any available CPUs. The Dataset job will hang unless more CPUs " + "are freed up. A common reason is that cluster resources are " + "used by Actors or Tune trials, see link for more details." + ) if not self.has_computed_output(): - context = DatasetContext.get_current() - # Read stage is handled with the legacy execution impl for now. if ( context.new_execution_backend diff --git a/python/ray/data/tests/test_dataset.py b/python/ray/data/tests/test_dataset.py index d9b991b5f8a4..9ae0364b9a26 100644 --- a/python/ray/data/tests/test_dataset.py +++ b/python/ray/data/tests/test_dataset.py @@ -4,6 +4,7 @@ import random import signal import time +from unittest.mock import patch import numpy as np import pandas as pd @@ -14,6 +15,7 @@ import ray from ray._private.test_utils import wait_for_condition from ray.air.util.tensor_extensions.arrow import ArrowVariableShapedTensorType +from ray.data._internal.dataset_logger import DatasetLogger from ray.data._internal.stats import _StatsActor from ray.data._internal.arrow_block import ArrowRow from ray.data._internal.block_builder import BlockBuilder @@ -5430,6 +5432,26 @@ def test_ragged_tensors(ray_start_regular_shared): ] +def test_warning_execute_with_no_cpu(ray_start_cluster): + cluster = ray_start_cluster + # Create one node with no CPUs to trigger the Dataset warning + cluster.add_node( + resources={"foo": 100}, + num_cpus=0, + ) + ray.init(cluster.address) + + logger = DatasetLogger("ray.data._internal.plan").get_logger( + log_to_stdout=True, + ) + with patch.object(logger, "warning") as mock_logger: + ds = ray.data.range(10) + ds = ds.map_batches(lambda x: x * 2) + ds.take() + logger_args, logger_kwargs = mock_logger.call_args + assert "Warning: The Ray cluster currently does not have " in logger_args[0] + + if __name__ == "__main__": import sys From 401a6f4ce7a91bd2a2267d3de0847b437b3b0505 Mon Sep 17 00:00:00 2001 From: Scott Lee Date: Tue, 10 Jan 2023 14:14:24 -0800 Subject: [PATCH 2/7] add docs link Signed-off-by: Scott Lee --- python/ray/data/_internal/plan.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/python/ray/data/_internal/plan.py b/python/ray/data/_internal/plan.py index 27167e239299..4f67426f03cd 100644 --- a/python/ray/data/_internal/plan.py +++ b/python/ray/data/_internal/plan.py @@ -338,7 +338,8 @@ def execute( "Warning: The Ray cluster currently does not have " "any available CPUs. The Dataset job will hang unless more CPUs " "are freed up. A common reason is that cluster resources are " - "used by Actors or Tune trials, see link for more details." + "used by Actors or Tune trials, see the link below for more details:" + "https://docs.ray.io/en/master/data/dataset-internals.html#datasets-and-tune" # noqa: E501 ) if not self.has_computed_output(): # Read stage is handled with the legacy execution impl for now. From c44d4daa66ea139af2d3abb3181aefe0e4bba4fb Mon Sep 17 00:00:00 2001 From: Scott Lee Date: Tue, 10 Jan 2023 17:35:43 -0800 Subject: [PATCH 3/7] remove stuck unit test Signed-off-by: Scott Lee --- python/ray/data/tests/test_dataset.py | 20 -------------------- 1 file changed, 20 deletions(-) diff --git a/python/ray/data/tests/test_dataset.py b/python/ray/data/tests/test_dataset.py index 9ae0364b9a26..96fab6d7010e 100644 --- a/python/ray/data/tests/test_dataset.py +++ b/python/ray/data/tests/test_dataset.py @@ -5432,26 +5432,6 @@ def test_ragged_tensors(ray_start_regular_shared): ] -def test_warning_execute_with_no_cpu(ray_start_cluster): - cluster = ray_start_cluster - # Create one node with no CPUs to trigger the Dataset warning - cluster.add_node( - resources={"foo": 100}, - num_cpus=0, - ) - ray.init(cluster.address) - - logger = DatasetLogger("ray.data._internal.plan").get_logger( - log_to_stdout=True, - ) - with patch.object(logger, "warning") as mock_logger: - ds = ray.data.range(10) - ds = ds.map_batches(lambda x: x * 2) - ds.take() - logger_args, logger_kwargs = mock_logger.call_args - assert "Warning: The Ray cluster currently does not have " in logger_args[0] - - if __name__ == "__main__": import sys From 9b325cacd162821da8a9364a727b24d119d37dec Mon Sep 17 00:00:00 2001 From: Scott Lee Date: Tue, 10 Jan 2023 17:51:44 -0800 Subject: [PATCH 4/7] format and clean up Signed-off-by: Scott Lee --- python/ray/data/tests/test_dataset.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/python/ray/data/tests/test_dataset.py b/python/ray/data/tests/test_dataset.py index 96fab6d7010e..d9b991b5f8a4 100644 --- a/python/ray/data/tests/test_dataset.py +++ b/python/ray/data/tests/test_dataset.py @@ -4,7 +4,6 @@ import random import signal import time -from unittest.mock import patch import numpy as np import pandas as pd @@ -15,7 +14,6 @@ import ray from ray._private.test_utils import wait_for_condition from ray.air.util.tensor_extensions.arrow import ArrowVariableShapedTensorType -from ray.data._internal.dataset_logger import DatasetLogger from ray.data._internal.stats import _StatsActor from ray.data._internal.arrow_block import ArrowRow from ray.data._internal.block_builder import BlockBuilder From 1164bbf4deff2bac84b1b79f6862af258a61305e Mon Sep 17 00:00:00 2001 From: Scott Lee Date: Tue, 10 Jan 2023 18:03:36 -0800 Subject: [PATCH 5/7] comments Signed-off-by: Scott Lee --- python/ray/data/_internal/plan.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/ray/data/_internal/plan.py b/python/ray/data/_internal/plan.py index 4f67426f03cd..d2a6b151b3e1 100644 --- a/python/ray/data/_internal/plan.py +++ b/python/ray/data/_internal/plan.py @@ -333,7 +333,7 @@ def execute( The blocks of the output dataset. """ context = DatasetContext.get_current() - if ray.available_resources().get("CPU", None) is None: + if not ray.available_resources().get("CPU"): logger.get_logger(log_to_stdout=context.enable_auto_log_stats).warning( "Warning: The Ray cluster currently does not have " "any available CPUs. The Dataset job will hang unless more CPUs " From f2abfa24a0caacf5d6a83554355eac44de6171d4 Mon Sep 17 00:00:00 2001 From: Scott Lee Date: Wed, 11 Jan 2023 14:03:36 -0800 Subject: [PATCH 6/7] add mocked test Signed-off-by: Scott Lee --- python/ray/data/tests/test_dataset.py | 56 +++++++++++++++++++++++++++ 1 file changed, 56 insertions(+) diff --git a/python/ray/data/tests/test_dataset.py b/python/ray/data/tests/test_dataset.py index d9b991b5f8a4..1be184156c01 100644 --- a/python/ray/data/tests/test_dataset.py +++ b/python/ray/data/tests/test_dataset.py @@ -4,6 +4,7 @@ import random import signal import time +from unittest.mock import patch import numpy as np import pandas as pd @@ -14,6 +15,7 @@ import ray from ray._private.test_utils import wait_for_condition from ray.air.util.tensor_extensions.arrow import ArrowVariableShapedTensorType +from ray.data._internal.dataset_logger import DatasetLogger from ray.data._internal.stats import _StatsActor from ray.data._internal.arrow_block import ArrowRow from ray.data._internal.block_builder import BlockBuilder @@ -5430,6 +5432,60 @@ def test_ragged_tensors(ray_start_regular_shared): ] +class LoggerWarningCalled(Exception): + """Custom exception used in test_warning_execute_with_no_cpu() and + test_nowarning_execute_with_cpu(). Raised when the `logger.warning` method + is called, so that we can kick out of `plan.execute()` by catching this Exception + and check logging was done properly.""" + + pass + + +def test_warning_execute_with_no_cpu(ray_start_cluster): + """Tests ExecutionPlan.execute() to ensure a warning is logged + when no CPU resources are available.""" + # Create one node with no CPUs to trigger the Dataset warning + cluster = ray_start_cluster + cluster.add_node(num_cpus=0) + ray.init(cluster.address) + + logger = DatasetLogger("ray.data._internal.plan").get_logger() + with patch.object( + logger, + "warning", + side_effect=LoggerWarningCalled, + ) as mock_logger: + try: + ds = ray.data.range(10) + ds = ds.map_batches(lambda x: x) + ds.take() + except LoggerWarningCalled: + logger_args, logger_kwargs = mock_logger.call_args + assert "Warning: The Ray cluster currently does not have " in logger_args[0] + + +def test_nowarning_execute_with_cpu(ray_start_cluster): + """Tests ExecutionPlan.execute() to ensure no warning is logged + when there are available CPU resources.""" + # Create one node with CPUs to avoid triggering the Dataset warning + cluster = ray_start_cluster + cluster.add_node(num_cpus=1) + ray.init(cluster.address) + + logger = DatasetLogger("ray.data._internal.plan").get_logger() + cluster.add_node(num_cpus=1) + + with patch.object( + logger, + "warning", + side_effect=LoggerWarningCalled, + ) as mock_logger: + ds = ray.data.range(10) + ds = ds.map_batches(lambda x: x) + ds.take() + mock_logger.assert_not_called() + + if __name__ == "__main__": import sys From 5824d565085203041fed46f18e0b9d5b0c5719f8 Mon Sep 17 00:00:00 2001 From: Scott Lee Date: Wed, 11 Jan 2023 16:50:57 -0800 Subject: [PATCH 7/7] update tests and clean up Signed-off-by: Scott Lee --- python/ray/data/_internal/plan.py | 5 +++-- python/ray/data/tests/test_dataset.py | 9 ++------- 2 files changed, 5 insertions(+), 9 deletions(-) diff --git a/python/ray/data/_internal/plan.py b/python/ray/data/_internal/plan.py index 17f6ec9bebef..2199204d5862 100644 --- a/python/ray/data/_internal/plan.py +++ b/python/ray/data/_internal/plan.py @@ -337,11 +337,12 @@ def execute( """ context = DatasetContext.get_current() if not ray.available_resources().get("CPU"): - logger.get_logger(log_to_stdout=context.enable_auto_log_stats).warning( + logger.get_logger().warning( "Warning: The Ray cluster currently does not have " "any available CPUs. The Dataset job will hang unless more CPUs " "are freed up. A common reason is that cluster resources are " - "used by Actors or Tune trials, see the link below for more details:" + "used by Actors or Tune trials; see the following link " + "for more details: " "https://docs.ray.io/en/master/data/dataset-internals.html#datasets-and-tune" # noqa: E501 ) if not self.has_computed_output(): diff --git a/python/ray/data/tests/test_dataset.py b/python/ray/data/tests/test_dataset.py index 4ba197d44d78..43c88b939390 100644 --- a/python/ray/data/tests/test_dataset.py +++ b/python/ray/data/tests/test_dataset.py @@ -5428,7 +5428,6 @@ def test_warning_execute_with_no_cpu(ray_start_cluster): # Create one node with no CPUs to trigger the Dataset warning cluster = ray_start_cluster cluster.add_node(num_cpus=0) - ray.init(cluster.address) logger = DatasetLogger("ray.data._internal.plan").get_logger() with patch.object( @@ -5445,17 +5444,13 @@ def test_warning_execute_with_no_cpu(ray_start_cluster): assert "Warning: The Ray cluster currently does not have " in logger_args[0] -def test_nowarning_execute_with_cpu(ray_start_cluster): +def test_nowarning_execute_with_cpu(ray_start_cluster_init): """Tests ExecutionPlan.execute() to ensure no warning is logged when there are available CPU resources.""" # Create one node with CPUs to avoid triggering the Dataset warning - cluster = ray_start_cluster - cluster.add_node(num_cpus=1) - ray.init(cluster.address) + ray.init(ray_start_cluster_init.address) logger = DatasetLogger("ray.data._internal.plan").get_logger() - cluster.add_node(num_cpus=1) - with patch.object( logger, "warning",