Skip to content

Commit

Permalink
[core] Add task and object reconstruction status to ray memory (ray-p…
Browse files Browse the repository at this point in the history
…roject#22317)

Improve observability for general objects and lineage reconstruction by adding a "Status" field to `ray memory`. The value of the field can be:
```
  // The task is waiting for its dependencies to be created.
  WAITING_FOR_DEPENDENCIES = 1;
  // All dependencies have been created and the task is scheduled to execute.
  SCHEDULED = 2;
  // The task finished successfully.
  FINISHED = 3;
```

In addition, tasks that failed or that needed to be re-executed due to lineage reconstruction will have a field listing the attempt number. Example output:
```
IP Address    | PID      | Type    | Call Site | Status    | Size     | Reference Type | Object Ref
192.168.4.22  | 279475   | Driver  | (task call) ... | Attempt #2: FINISHED | 10000254.0 B | LOCAL_REFERENCE | c2668a65bda616c1ffffffffffffffffffffffff0100000001000000


```
  • Loading branch information
stephanie-wang authored Feb 23, 2022
1 parent 9261428 commit abf2a70
Show file tree
Hide file tree
Showing 16 changed files with 283 additions and 20 deletions.
35 changes: 28 additions & 7 deletions dashboard/memory_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,12 @@ def __init__(
self.node_address = node_address

# object info
self.task_status = object_ref.get("taskStatus", "?")
if self.task_status == "NIL":
self.task_status = "-"
self.attempt_number = int(object_ref.get("attemptNumber", 0))
if self.attempt_number > 0:
self.task_status = f"Attempt #{self.attempt_number + 1}: {self.task_status}"
self.object_size = int(object_ref.get("objectSize", -1))
self.call_site = object_ref.get("callSite", "<Unknown>")
self.object_ref = ray.ObjectRef(
Expand Down Expand Up @@ -177,6 +183,7 @@ def as_dict(self):
"object_size": self.object_size,
"reference_type": self.reference_type,
"call_site": self.call_site,
"task_status": self.task_status,
"local_ref_count": self.local_ref_count,
"pinned_in_memory": self.pinned_in_memory,
"submitted_task_ref_count": self.submitted_task_ref_count,
Expand Down Expand Up @@ -385,9 +392,14 @@ def memory_summary(
# Fetch core memory worker stats, store as a dictionary
core_worker_stats = []
for raylet in state.node_table():
stats = node_stats_to_dict(
node_stats(raylet["NodeManagerAddress"], raylet["NodeManagerPort"])
)
if not raylet["Alive"]:
continue
try:
stats = node_stats_to_dict(
node_stats(raylet["NodeManagerAddress"], raylet["NodeManagerPort"])
)
except RuntimeError:
continue
core_worker_stats.extend(stats["coreWorkersStats"])
assert type(stats) is dict and "coreWorkersStats" in stats

Expand All @@ -407,7 +419,7 @@ def memory_summary(
"Mem Used by Objects",
"Local References",
"Pinned",
"Pending Tasks",
"Used by task",
"Captured in Objects",
"Actor Handles",
]
Expand All @@ -418,15 +430,16 @@ def memory_summary(
"PID",
"Type",
"Call Site",
"Status",
"Size",
"Reference Type",
"Object Ref",
]
object_ref_string = "{:<13} | {:<8} | {:<7} | {:<9} \
| {:<8} | {:<14} | {:<10}\n"
| {:<9} | {:<8} | {:<14} | {:<10}\n"

if size > line_wrap_threshold and line_wrap:
object_ref_string = "{:<15} {:<5} {:<6} {:<22} {:<6} {:<18} \
object_ref_string = "{:<15} {:<5} {:<6} {:<22} {:<14} {:<6} {:<18} \
{:<56}\n"

mem += f"Grouping by {group_by}...\
Expand Down Expand Up @@ -469,14 +482,22 @@ def memory_summary(
entry["call_site"][i : i + call_site_length]
for i in range(0, len(entry["call_site"]), call_site_length)
]
num_lines = len(entry["call_site"])

task_status_length = 12
entry["task_status"] = [
entry["task_status"][i : i + task_status_length]
for i in range(0, len(entry["task_status"]), task_status_length)
]
num_lines = max(len(entry["call_site"]), len(entry["task_status"]))

else:
mem += "\n"
object_ref_values = [
entry["node_ip_address"],
entry["pid"],
entry["type"],
entry["call_site"],
entry["task_status"],
entry["object_size"],
entry["reference_type"],
entry["object_ref"],
Expand Down
60 changes: 60 additions & 0 deletions python/ray/tests/test_memstat.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,11 @@
import ray
from ray.cluster_utils import Cluster, cluster_not_supported
from ray.internal.internal_api import memory_summary
from ray._private.test_utils import (
wait_for_condition,
Semaphore,
)


# RayConfig to enable recording call sites during ObjectRej creations.
ray_config = {"record_ref_creation_sites": True}
Expand Down Expand Up @@ -37,6 +42,11 @@
OBJECT_SIZE = "object size"
REFERENCE_TYPE = "reference type"

# Task status.
WAITING_FOR_DEPENDENCIES = "WAITING_FOR_DEPENDENCIES"
SCHEDULED = "SCHEDULED"
FINISHED = "FINISHED"


def data_lines(memory_str):
for line in memory_str.split("\n"):
Expand Down Expand Up @@ -309,6 +319,56 @@ def test_memory_used_output(ray_start_regular):
assert count(info, "8388861.0 B") == 2, info


def test_task_status(ray_start_regular):
address = ray_start_regular["address"]

@ray.remote
def dep(sema, x=None):
ray.get(sema.acquire.remote())
return

# Filter out actor handle refs.
def filtered_summary():
return "\n".join(
[
line
for line in memory_summary(address, line_wrap=False).split("\n")
if "ACTOR_HANDLE" not in line
]
)

sema = Semaphore.remote(value=0)
x = dep.remote(sema)
y = dep.remote(sema, x=x)
# x and its semaphore task are scheduled.
wait_for_condition(lambda: count(filtered_summary(), SCHEDULED) == 2)
wait_for_condition(lambda: count(filtered_summary(), WAITING_FOR_DEPENDENCIES) == 1)

z = dep.remote(sema, x=x)
wait_for_condition(lambda: count(filtered_summary(), WAITING_FOR_DEPENDENCIES) == 2)
wait_for_condition(lambda: count(filtered_summary(), SCHEDULED) == 2)
wait_for_condition(lambda: count(filtered_summary(), FINISHED) == 0)

sema.release.remote()
time.sleep(2)
wait_for_condition(lambda: count(filtered_summary(), FINISHED) == 1)
wait_for_condition(lambda: count(filtered_summary(), WAITING_FOR_DEPENDENCIES) == 0)
# y, z, and two semaphore tasks are scheduled.
wait_for_condition(lambda: count(filtered_summary(), SCHEDULED) == 4)

sema.release.remote()
wait_for_condition(lambda: count(filtered_summary(), FINISHED) == 2)
wait_for_condition(lambda: count(filtered_summary(), WAITING_FOR_DEPENDENCIES) == 0)
wait_for_condition(lambda: count(filtered_summary(), SCHEDULED) == 2)

sema.release.remote()
ray.get(y)
ray.get(z)
wait_for_condition(lambda: count(filtered_summary(), FINISHED) == 3)
wait_for_condition(lambda: count(filtered_summary(), WAITING_FOR_DEPENDENCIES) == 0)
wait_for_condition(lambda: count(filtered_summary(), SCHEDULED) == 0)


if __name__ == "__main__":
import sys

Expand Down
86 changes: 86 additions & 0 deletions python/ray/tests/test_reconstruction.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,17 @@
wait_for_condition,
wait_for_pid_to_exit,
SignalActor,
Semaphore,
)
from ray.internal.internal_api import memory_summary

SIGKILL = signal.SIGKILL if sys.platform != "win32" else signal.SIGTERM

# Task status.
WAITING_FOR_DEPENDENCIES = "WAITING_FOR_DEPENDENCIES"
SCHEDULED = "SCHEDULED"
FINISHED = "FINISHED"


def test_cached_object(ray_start_cluster):
config = {
Expand Down Expand Up @@ -1014,6 +1021,85 @@ def dependent_task(x):
ray.get(obj, timeout=60)


def test_memory_util(ray_start_cluster):
config = {
"num_heartbeats_timeout": 10,
"raylet_heartbeat_period_milliseconds": 100,
"object_timeout_milliseconds": 200,
}

cluster = ray_start_cluster
# Head node with no resources.
cluster.add_node(
num_cpus=0,
resources={"head": 1},
_system_config=config,
enable_object_reconstruction=True,
)
ray.init(address=cluster.address)
# Node to place the initial object.
node_to_kill = cluster.add_node(
num_cpus=1, resources={"node1": 1}, object_store_memory=10 ** 8
)
cluster.wait_for_nodes()

@ray.remote
def large_object(sema=None):
if sema is not None:
ray.get(sema.acquire.remote())
return np.zeros(10 ** 7, dtype=np.uint8)

@ray.remote
def dependent_task(x, sema):
ray.get(sema.acquire.remote())
return x

def stats():
info = memory_summary(cluster.address, line_wrap=False)
info = info.split("\n")
reconstructing_waiting = [
line
for line in info
if "Attempt #2" in line and WAITING_FOR_DEPENDENCIES in line
]
reconstructing_scheduled = [
line for line in info if "Attempt #2" in line and SCHEDULED in line
]
reconstructing_finished = [
line for line in info if "Attempt #2" in line and FINISHED in line
]
return (
len(reconstructing_waiting),
len(reconstructing_scheduled),
len(reconstructing_finished),
)

sema = Semaphore.options(resources={"head": 1}).remote(value=0)
obj = large_object.options(resources={"node1": 1}).remote(sema)
x = dependent_task.options(resources={"node1": 1}).remote(obj, sema)
ref = dependent_task.options(resources={"node1": 1}).remote(x, sema)
ray.get(sema.release.remote())
ray.get(sema.release.remote())
ray.get(sema.release.remote())
ray.get(ref)
wait_for_condition(lambda: stats() == (0, 0, 0))
del ref

cluster.remove_node(node_to_kill, allow_graceful=False)
node_to_kill = cluster.add_node(
num_cpus=1, resources={"node1": 1}, object_store_memory=10 ** 8
)

ref = dependent_task.remote(x, sema)
wait_for_condition(lambda: stats() == (1, 1, 0))
ray.get(sema.release.remote())
wait_for_condition(lambda: stats() == (0, 1, 1))
ray.get(sema.release.remote())
ray.get(sema.release.remote())
ray.get(ref)
wait_for_condition(lambda: stats() == (0, 0, 2))


if __name__ == "__main__":
import pytest

Expand Down
1 change: 1 addition & 0 deletions src/mock/ray/core_worker/task_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ class MockTaskFinisherInterface : public TaskFinisherInterface {
MOCK_METHOD(absl::optional<TaskSpecification>, GetTaskSpec, (const TaskID &task_id),
(const, override));
MOCK_METHOD(bool, RetryTaskIfPossible, (const TaskID &task_id), (override));
MOCK_METHOD(void, MarkDependenciesResolved, (const TaskID &task_id), (override));
};

} // namespace core
Expand Down
8 changes: 8 additions & 0 deletions src/ray/common/id.cc
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,14 @@ ActorID TaskID::ActorId() const {
reinterpret_cast<const char *>(id_ + kUniqueBytesLength), ActorID::Size()));
}

bool TaskID::IsForActorCreationTask() const {
static std::string nil_data(kUniqueBytesLength, 0);
FillNil(&nil_data);
bool unique_bytes_nil = std::memcmp(id_, nil_data.data(), kUniqueBytesLength) == 0;
bool actor_id_nil = ActorId().IsNil();
return unique_bytes_nil && !actor_id_nil;
}

JobID TaskID::JobId() const { return ActorId().JobId(); }

TaskID TaskID::ComputeDriverTaskId(const WorkerID &driver_id) {
Expand Down
3 changes: 3 additions & 0 deletions src/ray/common/id.h
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,9 @@ class TaskID : public BaseID<TaskID> {
/// \return The `ActorID` of the actor which creates this task.
ActorID ActorId() const;

/// Returns whether this is the ID of an actor creation task.
bool IsForActorCreationTask() const;

/// Get the id of the job to which this task belongs.
///
/// \return The `JobID` of the job which creates this task.
Expand Down
7 changes: 7 additions & 0 deletions src/ray/common/id_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,13 @@ TEST(TaskIDTest, TestTaskID) {
const TaskID task_id_1 =
TaskID::ForActorTask(kDefaultJobId, kDefaultDriverTaskId, 1, actor_id);
ASSERT_EQ(actor_id, task_id_1.ActorId());
ASSERT_FALSE(task_id_1.IsForActorCreationTask());

auto actor_creation_task_id = TaskID::ForActorCreationTask(actor_id);
ASSERT_TRUE(actor_creation_task_id.IsForActorCreationTask());

ASSERT_FALSE(TaskID::Nil().IsForActorCreationTask());
ASSERT_FALSE(TaskID::FromRandom(kDefaultJobId).IsForActorCreationTask());
}
}

Expand Down
1 change: 1 addition & 0 deletions src/ray/core_worker/core_worker.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2855,6 +2855,7 @@ void CoreWorker::HandleGetCoreWorkerStats(const rpc::GetCoreWorkerStatsRequest &
if (request.include_memory_info()) {
reference_counter_->AddObjectRefStats(plasma_store_provider_->UsedObjectsList(),
stats);
task_manager_->AddTaskStatusInfo(stats);
}

send_reply_callback(Status::OK(), nullptr, nullptr);
Expand Down
5 changes: 5 additions & 0 deletions src/ray/core_worker/object_recovery_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,11 @@ namespace ray {
namespace core {

bool ObjectRecoveryManager::RecoverObject(const ObjectID &object_id) {
if (object_id.TaskId().IsForActorCreationTask()) {
// The GCS manages all actor restarts, so we should never try to
// reconstruct an actor here.
return true;
}
// Check the ReferenceCounter to see if there is a location for the object.
bool owned_by_us = false;
NodeID pinned_at;
Expand Down
6 changes: 6 additions & 0 deletions src/ray/core_worker/reference_count.cc
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,12 @@ void ReferenceCounter::AddObjectRefStats(
for (const auto &obj_id : ref.second.contained_in_owned) {
ref_proto->add_contained_in_owned(obj_id.Binary());
}

if (ref.second.owned_by_us && !ref.second.pending_creation) {
// For finished tasks only, we set the status here instead of in the
// TaskManager in case the task spec has already been GCed.
ref_proto->set_task_status(rpc::TaskStatus::FINISHED);
}
}
// Also include any unreferenced objects that are pinned in memory.
for (const auto &entry : pinned_objects) {
Expand Down
Loading

0 comments on commit abf2a70

Please sign in to comment.