Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Warn on resource deadlock; improve object store error messages #5555

Merged
merged 8 commits into from
Aug 30, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 8 additions & 2 deletions python/ray/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,8 +105,14 @@ def __init__(self, object_id):
self.object_id = object_id

def __str__(self):
return ("Object {} is lost (either evicted or explicitly deleted) and "
+ "cannot be reconstructed.").format(self.object_id.hex())
return (
"Object {} is lost (either LRU evicted or deleted by user) and "
"cannot be reconstructed. Try increasing the object store "
"memory available with ray.init(object_store_memory=<bytes>) "
"or setting object store limits with "
"ray.remote(object_store_memory=<bytes>). See also: {}".format(
self.object_id.hex(),
"https://ray.readthedocs.io/en/latest/memory-management.html"))


RAY_EXCEPTION_TYPES = [
Expand Down
1 change: 1 addition & 0 deletions python/ray/ray_constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ def to_memory_units(memory_bytes, round_up):
WORKER_POOL_LARGE_ERROR = "worker_pool_large"
PUT_RECONSTRUCTION_PUSH_ERROR = "put_reconstruction"
INFEASIBLE_TASK_ERROR = "infeasible_task"
RESOURCE_DEADLOCK_ERROR = "resource_deadlock"
REMOVED_NODE_ERROR = "node_removed"
MONITOR_DIED_ERROR = "monitor_died"
LOG_MONITOR_DIED_ERROR = "log_monitor_died"
Expand Down
21 changes: 21 additions & 0 deletions python/ray/tests/test_failure.py
Original file line number Diff line number Diff line change
Expand Up @@ -527,6 +527,27 @@ def __init__(self):
wait_for_errors(ray_constants.PICKLING_LARGE_OBJECT_PUSH_ERROR, 2)


def test_warning_for_resource_deadlock(shutdown_only):
# Check that we get warning messages for infeasible tasks.
ray.init(num_cpus=1)

@ray.remote(num_cpus=1)
class Foo(object):
def f(self):
return 0

@ray.remote
def f():
# Creating both actors is not possible.
actors = [Foo.remote() for _ in range(2)]
for a in actors:
ray.get(a.f.remote())

# Run in a task to check we handle the blocked task case correctly
f.remote()
wait_for_errors(ray_constants.RESOURCE_DEADLOCK_ERROR, 1, timeout=30)


def test_warning_for_infeasible_tasks(ray_start_regular):
# Check that we get warning messages for infeasible tasks.

Expand Down
22 changes: 15 additions & 7 deletions python/ray/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -398,13 +398,20 @@ def put_object(self, object_id, value):
break
except pyarrow.plasma.PlasmaStoreFull as plasma_exc:
if attempt:
logger.debug(
"Waiting {} secs for plasma to drain.".format(delay))
logger.warning("Waiting {} seconds for space to free up "
"in the object store.".format(delay))
time.sleep(delay)
delay *= 2
else:
self.dump_object_store_memory_usage()
raise plasma_exc

def dump_object_store_memory_usage(self):
"""Prints object store debug string to stdout."""
msg = "\n" + self.plasma_client.debug_string()
msg = msg.replace("\n", "\nplasma: ")
logger.warning("Local object store memory usage:\n{}\n".format(msg))

def _try_store_and_register(self, object_id, value):
"""Wraps `store_and_register` with cases for existence and pickling.

Expand Down Expand Up @@ -1007,14 +1014,13 @@ def _set_plasma_client_options(self, client_name, object_store_memory):
self.plasma_client.set_client_options(client_name,
object_store_memory)
except pyarrow._plasma.PlasmaStoreFull:
self.dump_object_store_memory_usage()
raise memory_monitor.RayOutOfMemoryError(
"Failed to set object_store_memory={} for {}. The "
"plasma store may have insufficient memory remaining "
"to satisfy this limit (30% of object store memory is "
"permanently reserved for shared usage). The current "
"object store memory status is:\n\n{}".format(
object_store_memory, client_name,
self.plasma_client.debug_string()))
"permanently reserved for shared usage).".format(
object_store_memory, client_name))

def _handle_process_task_failure(self, function_descriptor,
return_object_ids, error, backtrace):
Expand Down Expand Up @@ -1788,7 +1794,7 @@ def listen_error_messages_raylet(worker, task_error_queue, threads_stopped):
# Delay it a bit to see if we can suppress it
task_error_queue.put((error_message, time.time()))
else:
logger.error(error_message)
logger.warn(error_message)
except (OSError, redis.exceptions.ConnectionError) as e:
logger.error("listen_error_messages_raylet: {}".format(e))
finally:
Expand Down Expand Up @@ -2329,6 +2335,8 @@ def get(object_ids):
for i, value in enumerate(values):
if isinstance(value, RayError):
last_task_error_raise_time = time.time()
if isinstance(value, ray.exceptions.UnreconstructableError):
worker.dump_object_store_memory_usage()
raise value

# Run post processors.
Expand Down
73 changes: 70 additions & 3 deletions src/ray/raylet/node_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -336,6 +336,7 @@ void NodeManager::Heartbeat() {
static_cast<int64_t>(now_ms - last_debug_dump_at_ms_) > debug_dump_period_) {
DumpDebugState();
RecordMetrics();
WarnResourceDeadlock();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's call this in DispatchTasks instead, and only call it if we weren't able to dispatch any tasks. Otherwise, we'll end up pushing an error every heartbeat for as long as the deadlock is happening (which is probably forever).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried doing this, but it ended up printing out too many false positives. The issue is that you only want to fire the warning if there has been a significant delay, and right after DispatchTasks is not it (though if you wait even a tiny bit of time resources could immediately free up, like if a task returns right after creating an actor).

I think printing forever is fine -- it is deadlocked after all.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, maybe I'm misunderstanding how DispatchTasks works, but if DispatchTasks doesn't succeed in scheduling anything, and the conditions that you check are true (no running tasks), then doesn't that mean it'll never succeed again? I think when this happens, it can only be because there are no available workers or if all the cores are taken up by actors. We can make sure it's the second case if we also check that there are no resources available.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not necessarily, a block/unblock could free up resources, here is the example I was testing:

import ray

@ray.remote(num_cpus=1)
class A:
    def f(self): pass


@ray.remote
def f():
    a = A.remote()
    b = A.remote()
    c = A.remote()
    print("get 1")
    ray.get(a.f.remote())
    print("get 2")
    ray.get(b.f.remote())
    print("get 3")
    ray.get(c.f.remote())

ray.init(num_cpus=2)
ray.get(f.remote())

last_debug_dump_at_ms_ = now_ms;
}

Expand All @@ -347,6 +348,69 @@ void NodeManager::Heartbeat() {
});
}

void NodeManager::WarnResourceDeadlock() {
// Check if any progress is being made on this raylet.
for (const auto &task : local_queues_.GetTasks(TaskState::RUNNING)) {
// Ignore blocked tasks.
if (local_queues_.GetBlockedTaskIds().count(task.GetTaskSpecification().TaskId())) {
continue;
}
// Progress is being made, don't warn.
resource_deadlock_warned_ = false;
return;
}

// suppress duplicates warning messages
if (resource_deadlock_warned_) {
return;
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe add an additional check that there are no resources available on the local node. The RUNNING queue can also be empty if there are no workers available (because they haven't started yet or are all died).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought about that but it could be the actor is not placeable due to its size of resources. We could try checking for that too, but it seems simpler to rely on the periodic delay to avoid the warning firing too often.

// The node is full of actors and no progress has been made for some time.
// If there are any pending tasks, build a warning.
ericl marked this conversation as resolved.
Show resolved Hide resolved
std::ostringstream error_message;
ray::Task exemplar;
bool should_warn = false;
int pending_actor_creations = 0;
int pending_tasks = 0;

// See if any tasks are blocked trying to acquire resources.
for (const auto &task : local_queues_.GetTasks(TaskState::READY)) {
const TaskSpecification &spec = task.GetTaskSpecification();
if (spec.IsActorCreationTask()) {
pending_actor_creations += 1;
} else {
pending_tasks += 1;
}
if (!should_warn) {
exemplar = task;
should_warn = true;
}
}

// Push an warning to the driver that a task is blocked trying to acquire resources.
if (should_warn) {
const auto &my_client_id = gcs_client_->client_table().GetLocalClientId();
SchedulingResources &local_resources = cluster_resource_map_[my_client_id];
error_message
<< "The actor or task with ID " << exemplar.GetTaskSpecification().TaskId()
<< " is pending and cannot currently be scheduled. It requires "
<< exemplar.GetTaskSpecification().GetRequiredResources().ToString()
<< " for execution and "
<< exemplar.GetTaskSpecification().GetRequiredPlacementResources().ToString()
<< " for placement, but this node only has remaining "
<< local_resources.GetAvailableResources().ToString() << ". In total there are "
<< pending_tasks << " pending tasks and " << pending_actor_creations
<< " pending actors on this node. "
<< "This is likely due to all cluster resources being claimed by actors. "
<< "To resolve the issue, consider creating fewer actors or increase the "
ericl marked this conversation as resolved.
Show resolved Hide resolved
<< "resources available to this Ray cluster.";
RAY_CHECK_OK(gcs_client_->error_table().PushErrorToDriver(
exemplar.GetTaskSpecification().JobId(), "resource_deadlock", error_message.str(),
current_time_ms()));
resource_deadlock_warned_ = true;
}
}

void NodeManager::GetObjectManagerProfileInfo() {
int64_t start_time_ms = current_time_ms();

Expand Down Expand Up @@ -1330,12 +1394,15 @@ void NodeManager::ScheduleTasks(
std::string type = "infeasible_task";
std::ostringstream error_message;
error_message
<< "The task with ID " << task.GetTaskSpecification().TaskId()
<< " is infeasible and cannot currently be executed. It requires "
<< "The actor or task with ID " << task.GetTaskSpecification().TaskId()
<< " is infeasible and cannot currently be scheduled. It requires "
<< task.GetTaskSpecification().GetRequiredResources().ToString()
<< " for execution and "
<< task.GetTaskSpecification().GetRequiredPlacementResources().ToString()
<< " for placement. Check the client table to view node resources.";
<< " for placement, however there are no nodes in the cluster that can "
<< "provide the requested resources. To resolve this issue, consider "
<< "reducing the resource requests of this task or add nodes that "
<< "can fit the task.";
RAY_CHECK_OK(gcs_client_->error_table().PushErrorToDriver(
task.GetTaskSpecification().JobId(), type, error_message.str(),
current_time_ms()));
Expand Down
6 changes: 6 additions & 0 deletions src/ray/raylet/node_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -492,6 +492,10 @@ class NodeManager : public rpc::NodeManagerServiceHandler {
rpc::ForwardTaskReply *reply,
rpc::SendReplyCallback send_reply_callback) override;

/// Push an error to the driver if this node is full of actors and so we are
/// unable to schedule new tasks or actors at all.
void WarnResourceDeadlock();

// GCS client ID for this node.
ClientID client_id_;
boost::asio::io_service &io_service_;
Expand All @@ -510,6 +514,8 @@ class NodeManager : public rpc::NodeManagerServiceHandler {
std::chrono::milliseconds heartbeat_period_;
/// The period between debug state dumps.
int64_t debug_dump_period_;
/// Whether we have printed out a resource deadlock warning.
bool resource_deadlock_warned_ = false;
/// The path to the ray temp dir.
std::string temp_dir_;
/// The timer used to get profiling information from the object manager and
Expand Down