diff --git a/dashboard/memory_utils.py b/dashboard/memory_utils.py index 86691cb0f4db..ca26095adc2c 100644 --- a/dashboard/memory_utils.py +++ b/dashboard/memory_utils.py @@ -99,6 +99,12 @@ def __init__( self.node_address = node_address # object info + self.task_status = object_ref.get("taskStatus", "?") + if self.task_status == "NIL": + self.task_status = "-" + self.attempt_number = int(object_ref.get("attemptNumber", 0)) + if self.attempt_number > 0: + self.task_status = f"Attempt #{self.attempt_number + 1}: {self.task_status}" self.object_size = int(object_ref.get("objectSize", -1)) self.call_site = object_ref.get("callSite", "") self.object_ref = ray.ObjectRef( @@ -177,6 +183,7 @@ def as_dict(self): "object_size": self.object_size, "reference_type": self.reference_type, "call_site": self.call_site, + "task_status": self.task_status, "local_ref_count": self.local_ref_count, "pinned_in_memory": self.pinned_in_memory, "submitted_task_ref_count": self.submitted_task_ref_count, @@ -385,9 +392,14 @@ def memory_summary( # Fetch core memory worker stats, store as a dictionary core_worker_stats = [] for raylet in state.node_table(): - stats = node_stats_to_dict( - node_stats(raylet["NodeManagerAddress"], raylet["NodeManagerPort"]) - ) + if not raylet["Alive"]: + continue + try: + stats = node_stats_to_dict( + node_stats(raylet["NodeManagerAddress"], raylet["NodeManagerPort"]) + ) + except RuntimeError: + continue core_worker_stats.extend(stats["coreWorkersStats"]) assert type(stats) is dict and "coreWorkersStats" in stats @@ -407,7 +419,7 @@ def memory_summary( "Mem Used by Objects", "Local References", "Pinned", - "Pending Tasks", + "Used by task", "Captured in Objects", "Actor Handles", ] @@ -418,15 +430,16 @@ def memory_summary( "PID", "Type", "Call Site", + "Status", "Size", "Reference Type", "Object Ref", ] object_ref_string = "{:<13} | {:<8} | {:<7} | {:<9} \ -| {:<8} | {:<14} | {:<10}\n" +| {:<9} | {:<8} | {:<14} | {:<10}\n" if size > line_wrap_threshold and line_wrap: - object_ref_string = "{:<15} {:<5} {:<6} {:<22} {:<6} {:<18} \ + object_ref_string = "{:<15} {:<5} {:<6} {:<22} {:<14} {:<6} {:<18} \ {:<56}\n" mem += f"Grouping by {group_by}...\ @@ -469,7 +482,14 @@ def memory_summary( entry["call_site"][i : i + call_site_length] for i in range(0, len(entry["call_site"]), call_site_length) ] - num_lines = len(entry["call_site"]) + + task_status_length = 12 + entry["task_status"] = [ + entry["task_status"][i : i + task_status_length] + for i in range(0, len(entry["task_status"]), task_status_length) + ] + num_lines = max(len(entry["call_site"]), len(entry["task_status"])) + else: mem += "\n" object_ref_values = [ @@ -477,6 +497,7 @@ def memory_summary( entry["pid"], entry["type"], entry["call_site"], + entry["task_status"], entry["object_size"], entry["reference_type"], entry["object_ref"], diff --git a/python/ray/tests/test_memstat.py b/python/ray/tests/test_memstat.py index d9d2f7557ebe..da6573c4aab1 100644 --- a/python/ray/tests/test_memstat.py +++ b/python/ray/tests/test_memstat.py @@ -6,6 +6,11 @@ import ray from ray.cluster_utils import Cluster, cluster_not_supported from ray.internal.internal_api import memory_summary +from ray._private.test_utils import ( + wait_for_condition, + Semaphore, +) + # RayConfig to enable recording call sites during ObjectRej creations. ray_config = {"record_ref_creation_sites": True} @@ -37,6 +42,11 @@ OBJECT_SIZE = "object size" REFERENCE_TYPE = "reference type" +# Task status. +WAITING_FOR_DEPENDENCIES = "WAITING_FOR_DEPENDENCIES" +SCHEDULED = "SCHEDULED" +FINISHED = "FINISHED" + def data_lines(memory_str): for line in memory_str.split("\n"): @@ -309,6 +319,56 @@ def test_memory_used_output(ray_start_regular): assert count(info, "8388861.0 B") == 2, info +def test_task_status(ray_start_regular): + address = ray_start_regular["address"] + + @ray.remote + def dep(sema, x=None): + ray.get(sema.acquire.remote()) + return + + # Filter out actor handle refs. + def filtered_summary(): + return "\n".join( + [ + line + for line in memory_summary(address, line_wrap=False).split("\n") + if "ACTOR_HANDLE" not in line + ] + ) + + sema = Semaphore.remote(value=0) + x = dep.remote(sema) + y = dep.remote(sema, x=x) + # x and its semaphore task are scheduled. + wait_for_condition(lambda: count(filtered_summary(), SCHEDULED) == 2) + wait_for_condition(lambda: count(filtered_summary(), WAITING_FOR_DEPENDENCIES) == 1) + + z = dep.remote(sema, x=x) + wait_for_condition(lambda: count(filtered_summary(), WAITING_FOR_DEPENDENCIES) == 2) + wait_for_condition(lambda: count(filtered_summary(), SCHEDULED) == 2) + wait_for_condition(lambda: count(filtered_summary(), FINISHED) == 0) + + sema.release.remote() + time.sleep(2) + wait_for_condition(lambda: count(filtered_summary(), FINISHED) == 1) + wait_for_condition(lambda: count(filtered_summary(), WAITING_FOR_DEPENDENCIES) == 0) + # y, z, and two semaphore tasks are scheduled. + wait_for_condition(lambda: count(filtered_summary(), SCHEDULED) == 4) + + sema.release.remote() + wait_for_condition(lambda: count(filtered_summary(), FINISHED) == 2) + wait_for_condition(lambda: count(filtered_summary(), WAITING_FOR_DEPENDENCIES) == 0) + wait_for_condition(lambda: count(filtered_summary(), SCHEDULED) == 2) + + sema.release.remote() + ray.get(y) + ray.get(z) + wait_for_condition(lambda: count(filtered_summary(), FINISHED) == 3) + wait_for_condition(lambda: count(filtered_summary(), WAITING_FOR_DEPENDENCIES) == 0) + wait_for_condition(lambda: count(filtered_summary(), SCHEDULED) == 0) + + if __name__ == "__main__": import sys diff --git a/python/ray/tests/test_reconstruction.py b/python/ray/tests/test_reconstruction.py index cf1116cf9613..a7d7b07c9643 100644 --- a/python/ray/tests/test_reconstruction.py +++ b/python/ray/tests/test_reconstruction.py @@ -11,10 +11,17 @@ wait_for_condition, wait_for_pid_to_exit, SignalActor, + Semaphore, ) +from ray.internal.internal_api import memory_summary SIGKILL = signal.SIGKILL if sys.platform != "win32" else signal.SIGTERM +# Task status. +WAITING_FOR_DEPENDENCIES = "WAITING_FOR_DEPENDENCIES" +SCHEDULED = "SCHEDULED" +FINISHED = "FINISHED" + def test_cached_object(ray_start_cluster): config = { @@ -1014,6 +1021,85 @@ def dependent_task(x): ray.get(obj, timeout=60) +def test_memory_util(ray_start_cluster): + config = { + "num_heartbeats_timeout": 10, + "raylet_heartbeat_period_milliseconds": 100, + "object_timeout_milliseconds": 200, + } + + cluster = ray_start_cluster + # Head node with no resources. + cluster.add_node( + num_cpus=0, + resources={"head": 1}, + _system_config=config, + enable_object_reconstruction=True, + ) + ray.init(address=cluster.address) + # Node to place the initial object. + node_to_kill = cluster.add_node( + num_cpus=1, resources={"node1": 1}, object_store_memory=10 ** 8 + ) + cluster.wait_for_nodes() + + @ray.remote + def large_object(sema=None): + if sema is not None: + ray.get(sema.acquire.remote()) + return np.zeros(10 ** 7, dtype=np.uint8) + + @ray.remote + def dependent_task(x, sema): + ray.get(sema.acquire.remote()) + return x + + def stats(): + info = memory_summary(cluster.address, line_wrap=False) + info = info.split("\n") + reconstructing_waiting = [ + line + for line in info + if "Attempt #2" in line and WAITING_FOR_DEPENDENCIES in line + ] + reconstructing_scheduled = [ + line for line in info if "Attempt #2" in line and SCHEDULED in line + ] + reconstructing_finished = [ + line for line in info if "Attempt #2" in line and FINISHED in line + ] + return ( + len(reconstructing_waiting), + len(reconstructing_scheduled), + len(reconstructing_finished), + ) + + sema = Semaphore.options(resources={"head": 1}).remote(value=0) + obj = large_object.options(resources={"node1": 1}).remote(sema) + x = dependent_task.options(resources={"node1": 1}).remote(obj, sema) + ref = dependent_task.options(resources={"node1": 1}).remote(x, sema) + ray.get(sema.release.remote()) + ray.get(sema.release.remote()) + ray.get(sema.release.remote()) + ray.get(ref) + wait_for_condition(lambda: stats() == (0, 0, 0)) + del ref + + cluster.remove_node(node_to_kill, allow_graceful=False) + node_to_kill = cluster.add_node( + num_cpus=1, resources={"node1": 1}, object_store_memory=10 ** 8 + ) + + ref = dependent_task.remote(x, sema) + wait_for_condition(lambda: stats() == (1, 1, 0)) + ray.get(sema.release.remote()) + wait_for_condition(lambda: stats() == (0, 1, 1)) + ray.get(sema.release.remote()) + ray.get(sema.release.remote()) + ray.get(ref) + wait_for_condition(lambda: stats() == (0, 0, 2)) + + if __name__ == "__main__": import pytest diff --git a/src/mock/ray/core_worker/task_manager.h b/src/mock/ray/core_worker/task_manager.h index 9f345e9df4a4..f5b64ad775ab 100644 --- a/src/mock/ray/core_worker/task_manager.h +++ b/src/mock/ray/core_worker/task_manager.h @@ -43,6 +43,7 @@ class MockTaskFinisherInterface : public TaskFinisherInterface { MOCK_METHOD(absl::optional, GetTaskSpec, (const TaskID &task_id), (const, override)); MOCK_METHOD(bool, RetryTaskIfPossible, (const TaskID &task_id), (override)); + MOCK_METHOD(void, MarkDependenciesResolved, (const TaskID &task_id), (override)); }; } // namespace core diff --git a/src/ray/common/id.cc b/src/ray/common/id.cc index 8720fa428fac..16de6db172b1 100644 --- a/src/ray/common/id.cc +++ b/src/ray/common/id.cc @@ -212,6 +212,14 @@ ActorID TaskID::ActorId() const { reinterpret_cast(id_ + kUniqueBytesLength), ActorID::Size())); } +bool TaskID::IsForActorCreationTask() const { + static std::string nil_data(kUniqueBytesLength, 0); + FillNil(&nil_data); + bool unique_bytes_nil = std::memcmp(id_, nil_data.data(), kUniqueBytesLength) == 0; + bool actor_id_nil = ActorId().IsNil(); + return unique_bytes_nil && !actor_id_nil; +} + JobID TaskID::JobId() const { return ActorId().JobId(); } TaskID TaskID::ComputeDriverTaskId(const WorkerID &driver_id) { diff --git a/src/ray/common/id.h b/src/ray/common/id.h index b951b4304271..c20cffab8280 100644 --- a/src/ray/common/id.h +++ b/src/ray/common/id.h @@ -240,6 +240,9 @@ class TaskID : public BaseID { /// \return The `ActorID` of the actor which creates this task. ActorID ActorId() const; + /// Returns whether this is the ID of an actor creation task. + bool IsForActorCreationTask() const; + /// Get the id of the job to which this task belongs. /// /// \return The `JobID` of the job which creates this task. diff --git a/src/ray/common/id_test.cc b/src/ray/common/id_test.cc index e3e925965f08..c61191430eca 100644 --- a/src/ray/common/id_test.cc +++ b/src/ray/common/id_test.cc @@ -66,6 +66,13 @@ TEST(TaskIDTest, TestTaskID) { const TaskID task_id_1 = TaskID::ForActorTask(kDefaultJobId, kDefaultDriverTaskId, 1, actor_id); ASSERT_EQ(actor_id, task_id_1.ActorId()); + ASSERT_FALSE(task_id_1.IsForActorCreationTask()); + + auto actor_creation_task_id = TaskID::ForActorCreationTask(actor_id); + ASSERT_TRUE(actor_creation_task_id.IsForActorCreationTask()); + + ASSERT_FALSE(TaskID::Nil().IsForActorCreationTask()); + ASSERT_FALSE(TaskID::FromRandom(kDefaultJobId).IsForActorCreationTask()); } } diff --git a/src/ray/core_worker/core_worker.cc b/src/ray/core_worker/core_worker.cc index 41e9f40f3aa8..edcaf9613dbd 100644 --- a/src/ray/core_worker/core_worker.cc +++ b/src/ray/core_worker/core_worker.cc @@ -2855,6 +2855,7 @@ void CoreWorker::HandleGetCoreWorkerStats(const rpc::GetCoreWorkerStatsRequest & if (request.include_memory_info()) { reference_counter_->AddObjectRefStats(plasma_store_provider_->UsedObjectsList(), stats); + task_manager_->AddTaskStatusInfo(stats); } send_reply_callback(Status::OK(), nullptr, nullptr); diff --git a/src/ray/core_worker/object_recovery_manager.cc b/src/ray/core_worker/object_recovery_manager.cc index a81498f8f280..790eeba43c80 100644 --- a/src/ray/core_worker/object_recovery_manager.cc +++ b/src/ray/core_worker/object_recovery_manager.cc @@ -20,6 +20,11 @@ namespace ray { namespace core { bool ObjectRecoveryManager::RecoverObject(const ObjectID &object_id) { + if (object_id.TaskId().IsForActorCreationTask()) { + // The GCS manages all actor restarts, so we should never try to + // reconstruct an actor here. + return true; + } // Check the ReferenceCounter to see if there is a location for the object. bool owned_by_us = false; NodeID pinned_at; diff --git a/src/ray/core_worker/reference_count.cc b/src/ray/core_worker/reference_count.cc index 9ca66137a562..313f752556e3 100644 --- a/src/ray/core_worker/reference_count.cc +++ b/src/ray/core_worker/reference_count.cc @@ -151,6 +151,12 @@ void ReferenceCounter::AddObjectRefStats( for (const auto &obj_id : ref.second.contained_in_owned) { ref_proto->add_contained_in_owned(obj_id.Binary()); } + + if (ref.second.owned_by_us && !ref.second.pending_creation) { + // For finished tasks only, we set the status here instead of in the + // TaskManager in case the task spec has already been GCed. + ref_proto->set_task_status(rpc::TaskStatus::FINISHED); + } } // Also include any unreferenced objects that are pinned in memory. for (const auto &entry : pinned_objects) { diff --git a/src/ray/core_worker/task_manager.cc b/src/ray/core_worker/task_manager.cc index e2b488ee29d2..2b4a370700bc 100644 --- a/src/ray/core_worker/task_manager.cc +++ b/src/ray/core_worker/task_manager.cc @@ -114,9 +114,9 @@ bool TaskManager::ResubmitTask(const TaskID &task_id, std::vector *tas return false; } - if (!it->second.pending) { + if (!it->second.IsPending()) { resubmit = true; - it->second.pending = true; + it->second.status = rpc::TaskStatus::WAITING_FOR_DEPENDENCIES; num_pending_tasks_++; // The task is pending again, so it's no longer counted as lineage. If @@ -193,7 +193,7 @@ bool TaskManager::IsTaskPending(const TaskID &task_id) const { if (it == submissible_tasks_.end()) { return false; } - return it->second.pending; + return it->second.IsPending(); } size_t TaskManager::NumSubmissibleTasks() const { @@ -308,7 +308,7 @@ void TaskManager::CompletePendingTask(const TaskID &task_id, << " plasma returns in scope"; it->second.num_successful_executions++; - it->second.pending = false; + it->second.status = rpc::TaskStatus::FINISHED; num_pending_tasks_--; // A finished task can only be re-executed if it has some number of @@ -351,7 +351,7 @@ bool TaskManager::RetryTaskIfPossible(const TaskID &task_id) { auto it = submissible_tasks_.find(task_id); RAY_CHECK(it != submissible_tasks_.end()) << "Tried to retry task that was not pending " << task_id; - RAY_CHECK(it->second.pending) + RAY_CHECK(it->second.IsPending()) << "Tried to retry task that was not pending " << task_id; spec = it->second.spec; num_retries_left = it->second.num_retries_left; @@ -393,7 +393,7 @@ void TaskManager::FailPendingTask(const TaskID &task_id, rpc::ErrorType error_ty auto it = submissible_tasks_.find(task_id); RAY_CHECK(it != submissible_tasks_.end()) << "Tried to fail task that was not pending " << task_id; - RAY_CHECK(it->second.pending) + RAY_CHECK(it->second.IsPending()) << "Tried to fail task that was not pending " << task_id; spec = it->second.spec; submissible_tasks_.erase(it); @@ -529,7 +529,7 @@ int64_t TaskManager::RemoveLineageReference(const ObjectID &object_id, << it->second.reconstructable_return_ids.size() << " plasma returns in scope"; - if (it->second.reconstructable_return_ids.empty() && !it->second.pending) { + if (it->second.reconstructable_return_ids.empty() && !it->second.IsPending()) { // If the task can no longer be retried, decrement the lineage ref count // for each of the task's args. for (size_t i = 0; i < it->second.spec.NumArgs(); i++) { @@ -592,12 +592,38 @@ std::vector TaskManager::GetPendingChildrenTasks( std::vector ret_vec; absl::MutexLock lock(&mu_); for (auto it : submissible_tasks_) { - if ((it.second.pending) && (it.second.spec.ParentTaskId() == parent_task_id)) { + if (it.second.IsPending() && (it.second.spec.ParentTaskId() == parent_task_id)) { ret_vec.push_back(it.first); } } return ret_vec; } +void TaskManager::AddTaskStatusInfo(rpc::CoreWorkerStats *stats) const { + absl::MutexLock lock(&mu_); + for (int64_t i = 0; i < stats->object_refs_size(); i++) { + auto ref = stats->mutable_object_refs(i); + const auto obj_id = ObjectID::FromBinary(ref->object_id()); + const auto task_id = obj_id.TaskId(); + const auto it = submissible_tasks_.find(task_id); + if (it == submissible_tasks_.end()) { + continue; + } + ref->set_task_status(it->second.status); + ref->set_attempt_number(it->second.spec.AttemptNumber()); + } +} + +void TaskManager::MarkDependenciesResolved(const TaskID &task_id) { + absl::MutexLock lock(&mu_); + auto it = submissible_tasks_.find(task_id); + if (it == submissible_tasks_.end()) { + return; + } + if (it->second.status == rpc::TaskStatus::WAITING_FOR_DEPENDENCIES) { + it->second.status = rpc::TaskStatus::SCHEDULED; + } +} + } // namespace core } // namespace ray diff --git a/src/ray/core_worker/task_manager.h b/src/ray/core_worker/task_manager.h index 160bc54594a1..b22f4dc7e9e1 100644 --- a/src/ray/core_worker/task_manager.h +++ b/src/ray/core_worker/task_manager.h @@ -20,6 +20,7 @@ #include "ray/common/id.h" #include "ray/common/task/task.h" #include "ray/core_worker/store_provider/memory_store/memory_store.h" +#include "src/ray/protobuf/common.pb.h" #include "src/ray/protobuf/core_worker.pb.h" #include "src/ray/protobuf/gcs.pb.h" @@ -47,6 +48,8 @@ class TaskFinisherInterface { const std::vector &inlined_dependency_ids, const std::vector &contained_ids) = 0; + virtual void MarkDependenciesResolved(const TaskID &task_id) = 0; + virtual bool MarkTaskCanceled(const TaskID &task_id) = 0; virtual void MarkTaskReturnObjectsFailed( @@ -230,6 +233,19 @@ class TaskManager : public TaskFinisherInterface, public TaskResubmissionInterfa return total_lineage_footprint_bytes_; } + /// Record that the given task's dependencies have been created and the task + /// can now be scheduled for execution. + /// + /// \param[in] task_id The task that is now scheduled. + void MarkDependenciesResolved(const TaskID &task_id) override; + + /// Add debug information about the current task status for the ObjectRefs + /// included in the given stats. + /// + /// \param[out] stats Will be populated with objects' current task status, if + /// any. + void AddTaskStatusInfo(rpc::CoreWorkerStats *stats) const; + private: struct TaskEntry { TaskEntry(const TaskSpecification &spec_arg, int num_retries_left_arg, @@ -239,6 +255,9 @@ class TaskManager : public TaskFinisherInterface, public TaskResubmissionInterfa reconstructable_return_ids.insert(spec.ReturnId(i)); } } + + bool IsPending() const { return status != rpc::TaskStatus::FINISHED; } + /// The task spec. This is pinned as long as the following are true: /// - The task is still pending execution. This means that the task may /// fail and so it may be retried in the future. @@ -256,10 +275,8 @@ class TaskManager : public TaskFinisherInterface, public TaskResubmissionInterfa int num_retries_left; // Number of times this task successfully completed execution so far. int num_successful_executions = 0; - // Whether this task is currently pending execution. This is used to pin - // the task entry if the task is still pending but all of its return IDs - // are out of scope. - bool pending = true; + // The task's current execution status. + rpc::TaskStatus status = rpc::TaskStatus::WAITING_FOR_DEPENDENCIES; // Objects returned by this task that are reconstructable. This is set // initially to the task's return objects, since if the task fails, these // objects may be reconstructed by resubmitting the task. Once the task diff --git a/src/ray/core_worker/test/direct_task_transport_test.cc b/src/ray/core_worker/test/direct_task_transport_test.cc index 55652390a006..5501c3913508 100644 --- a/src/ray/core_worker/test/direct_task_transport_test.cc +++ b/src/ray/core_worker/test/direct_task_transport_test.cc @@ -125,6 +125,8 @@ class MockTaskFinisher : public TaskFinisherInterface { return task; } + void MarkDependenciesResolved(const TaskID &task_id) override {} + int num_tasks_complete = 0; int num_tasks_failed = 0; int num_inlined_dependencies = 0; diff --git a/src/ray/core_worker/transport/direct_actor_task_submitter.cc b/src/ray/core_worker/transport/direct_actor_task_submitter.cc index a0a9ef335175..54ff6212713c 100644 --- a/src/ray/core_worker/transport/direct_actor_task_submitter.cc +++ b/src/ray/core_worker/transport/direct_actor_task_submitter.cc @@ -94,8 +94,10 @@ Status CoreWorkerDirectActorTaskSubmitter::SubmitTask(TaskSpecification task_spe // We must release the lock before resolving the task dependencies since // the callback may get called in the same call stack. auto actor_id = task_spec.ActorId(); + auto task_id = task_spec.TaskId(); resolver_.ResolveDependencies( - task_spec, [this, send_pos, actor_id](Status status) { + task_spec, [this, send_pos, actor_id, task_id](Status status) { + task_finisher_.MarkDependenciesResolved(task_id); absl::MutexLock lock(&mu_); auto queue = client_queues_.find(actor_id); RAY_CHECK(queue != client_queues_.end()); diff --git a/src/ray/core_worker/transport/direct_task_transport.cc b/src/ray/core_worker/transport/direct_task_transport.cc index 67f364abca9b..40cb5aed6c91 100644 --- a/src/ray/core_worker/transport/direct_task_transport.cc +++ b/src/ray/core_worker/transport/direct_task_transport.cc @@ -24,6 +24,7 @@ Status CoreWorkerDirectTaskSubmitter::SubmitTask(TaskSpecification task_spec) { num_tasks_submitted_++; resolver_.ResolveDependencies(task_spec, [this, task_spec](Status status) { + task_finisher_->MarkDependenciesResolved(task_spec.TaskId()); if (!status.ok()) { RAY_LOG(WARNING) << "Resolving task dependencies failed " << status.ToString(); RAY_UNUSED(task_finisher_->FailOrRetryPendingTask( diff --git a/src/ray/protobuf/common.proto b/src/ray/protobuf/common.proto index 837eb2e00fe1..f75ccba130ea 100644 --- a/src/ray/protobuf/common.proto +++ b/src/ray/protobuf/common.proto @@ -477,6 +477,18 @@ message ViewData { repeated Measure measures = 2; } +enum TaskStatus { + // We don't have a status for this task because we are not the owner or the + // task metadata has already been deleted. + NIL = 0; + // The task is waiting for its dependencies to be created. + WAITING_FOR_DEPENDENCIES = 1; + // All dependencies have been created and the task is scheduled to execute. + SCHEDULED = 2; + // The task finished successfully. + FINISHED = 3; +} + // Debug info for a referenced object. message ObjectRefInfo { // Object id that is referenced. @@ -493,6 +505,11 @@ message ObjectRefInfo { repeated bytes contained_in_owned = 6; // True if this object is pinned in memory by the current process. bool pinned_in_memory = 7; + // Status of the task that creates this object. + TaskStatus task_status = 8; + // A count of the number of times this task has been attempted so far. 0 + // means this is the first execution. + uint64 attempt_number = 9; } // Details about the allocation of a given resource. Some resources