Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Datasets] Emit warning when starting Dataset execution with no CPU resources available #31574

Merged
merged 9 commits into from
Jan 12, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 10 additions & 2 deletions python/ray/data/_internal/plan.py
Original file line number Diff line number Diff line change
Expand Up @@ -335,9 +335,17 @@ def execute(
Returns:
The blocks of the output dataset.
"""
context = DatasetContext.get_current()
if not ray.available_resources().get("CPU"):
logger.get_logger().warning(
"Warning: The Ray cluster currently does not have "
"any available CPUs. The Dataset job will hang unless more CPUs "
"are freed up. A common reason is that cluster resources are "
"used by Actors or Tune trials; see the following link "
"for more details: "
"https://docs.ray.io/en/master/data/dataset-internals.html#datasets-and-tune" # noqa: E501
)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we add a unit test for this warning?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ericl I added a note in the PR description about my attempts at unit testing -- let me know if you have suggestions on how to get around the issue I was facing, or any other questions

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

However, I realized that unit testing this would be challenging, because the test will get stuck on ds.take(). This is because since the initial ray cluster is set up with no CPU resources, and there's no way to add more resources mid-execution, it will always get stuck in the ray.available_resources().get("CPU", None) is None case and be unable to finish executing the dataset.

You can run it in a separate thread as a workaround.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ran into some trouble using the threading library, but I was able to alternatively leverage Mock and raising a dummy exception via Mock.side_effect to write two simplified unit tests.

if not self.has_computed_output():
context = DatasetContext.get_current()

# Read stage is handled with the legacy execution impl for now.
if (
context.new_execution_backend
Expand Down
51 changes: 51 additions & 0 deletions python/ray/data/tests/test_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import random
import signal
import time
from unittest.mock import patch

import numpy as np
import pandas as pd
Expand All @@ -14,6 +15,7 @@
import ray
from ray._private.test_utils import wait_for_condition
from ray.air.util.tensor_extensions.arrow import ArrowVariableShapedTensorType
from ray.data._internal.dataset_logger import DatasetLogger
from ray.data._internal.stats import _StatsActor
from ray.data._internal.arrow_block import ArrowRow
from ray.data._internal.block_builder import BlockBuilder
Expand Down Expand Up @@ -5411,6 +5413,55 @@ def test_ragged_tensors(ray_start_regular_shared):
]


class LoggerWarningCalled(Exception):
"""Custom exception used in test_warning_execute_with_no_cpu() and
test_nowarning_execute_with_cpu(). Raised when the `logger.warning` method
is called, so that we can kick out of `plan.execute()` by catching this Exception
and check logging was done properly."""

pass


def test_warning_execute_with_no_cpu(ray_start_cluster):
"""Tests ExecutionPlan.execute() to ensure a warning is logged
when no CPU resources are available."""
# Create one node with no CPUs to trigger the Dataset warning
cluster = ray_start_cluster
cluster.add_node(num_cpus=0)

logger = DatasetLogger("ray.data._internal.plan").get_logger()
with patch.object(
logger,
"warning",
side_effect=LoggerWarningCalled,
) as mock_logger:
try:
ds = ray.data.range(10)
ds = ds.map_batches(lambda x: x)
ds.take()
except LoggerWarningCalled:
logger_args, logger_kwargs = mock_logger.call_args
assert "Warning: The Ray cluster currently does not have " in logger_args[0]


def test_nowarning_execute_with_cpu(ray_start_cluster_init):
"""Tests ExecutionPlan.execute() to ensure no warning is logged
when there are available CPU resources."""
# Create one node with CPUs to avoid triggering the Dataset warning
ray.init(ray_start_cluster_init.address)

logger = DatasetLogger("ray.data._internal.plan").get_logger()
with patch.object(
logger,
"warning",
side_effect=LoggerWarningCalled,
) as mock_logger:
ds = ray.data.range(10)
ds = ds.map_batches(lambda x: x)
ds.take()
mock_logger.assert_not_called()


if __name__ == "__main__":
import sys

Expand Down