From 3f2502b7c5fca1cb417a3221ac2a8453ef0c8171 Mon Sep 17 00:00:00 2001 From: Eric Liang Date: Fri, 9 Sep 2022 14:31:01 -0700 Subject: [PATCH 01/17] wip Signed-off-by: Eric Liang --- python/ray/_private/prometheus_exporter.py | 2 +- src/ray/core_worker/core_worker_process.cc | 1 + src/ray/core_worker/task_manager.cc | 3 ++- src/ray/core_worker/task_manager.h | 26 ++++++++++++++++++++-- src/ray/stats/metric_defs.cc | 8 +++++++ src/ray/stats/metric_defs.h | 3 +++ src/ray/stats/tag_defs.cc | 2 ++ src/ray/stats/tag_defs.h | 2 ++ 8 files changed, 43 insertions(+), 4 deletions(-) diff --git a/python/ray/_private/prometheus_exporter.py b/python/ray/_private/prometheus_exporter.py index 19aa4b396f7f..52890b816792 100644 --- a/python/ray/_private/prometheus_exporter.py +++ b/python/ray/_private/prometheus_exporter.py @@ -144,7 +144,7 @@ def to_metric(self, desc, tag_values, agg_data): metric_description = desc["documentation"] label_keys = desc["labels"] metric_units = desc["units"] - assert len(tag_values) == len(label_keys) + assert len(tag_values) == len(label_keys), (tag_values, label_keys) # Prometheus requires that all tag values be strings hence # the need to cast none to the empty string before exporting. See # https://github.com/census-instrumentation/opencensus-python/issues/480 diff --git a/src/ray/core_worker/core_worker_process.cc b/src/ray/core_worker/core_worker_process.cc index 61acec534a74..2ad3cfcf9c1a 100644 --- a/src/ray/core_worker/core_worker_process.cc +++ b/src/ray/core_worker/core_worker_process.cc @@ -123,6 +123,7 @@ CoreWorkerProcessImpl::CoreWorkerProcessImpl(const CoreWorkerOptions &options) // Initialize stats in core worker global tags. const ray::stats::TagsType global_tags = { {ray::stats::ComponentKey, "core_worker"}, + {ray::stats::WorkerIdKey, worker_id_.Hex()}, {ray::stats::VersionKey, kRayVersion}, {ray::stats::NodeAddressKey, options_.node_ip_address}}; diff --git a/src/ray/core_worker/task_manager.cc b/src/ray/core_worker/task_manager.cc index ec4991c23f11..7f5b312ff11f 100644 --- a/src/ray/core_worker/task_manager.cc +++ b/src/ray/core_worker/task_manager.cc @@ -98,7 +98,8 @@ std::vector TaskManager::AddPendingTask( { absl::MutexLock lock(&mu_); auto inserted = submissible_tasks_.emplace(spec.TaskId(), - TaskEntry(spec, max_retries, num_returns)); + TaskEntry(spec, max_retries, num_returns, + task_counter_)); RAY_CHECK(inserted.second); num_pending_tasks_++; } diff --git a/src/ray/core_worker/task_manager.h b/src/ray/core_worker/task_manager.h index 044ee6b141bb..2c87c3e7ea3e 100644 --- a/src/ray/core_worker/task_manager.h +++ b/src/ray/core_worker/task_manager.h @@ -20,6 +20,7 @@ #include "ray/common/id.h" #include "ray/common/task/task.h" #include "ray/core_worker/store_provider/memory_store/memory_store.h" +#include "ray/stats/metric_defs.h" #include "src/ray/protobuf/common.pb.h" #include "src/ray/protobuf/core_worker.pb.h" #include "src/ray/protobuf/gcs.pb.h" @@ -27,6 +28,15 @@ namespace ray { namespace core { +class TaskStatusCounter { + public: + void Swap(rpc::TaskStatus old_status, rpc::TaskStatus new_status) { + } + void Increment(rpc::TaskStatus status) { + ray::stats::STATS_tasks.Record(1, "RUNNING"); + } +}; + class TaskFinisherInterface { public: virtual void CompletePendingTask(const TaskID &task_id, @@ -276,11 +286,18 @@ class TaskManager : public TaskFinisherInterface, public TaskResubmissionInterfa struct TaskEntry { TaskEntry(const TaskSpecification &spec_arg, int num_retries_left_arg, - size_t num_returns) - : spec(spec_arg), num_retries_left(num_retries_left_arg) { + size_t num_returns, + TaskStatusCounter& counter) + : spec(spec_arg), num_retries_left(num_retries_left_arg), counter(counter) { for (size_t i = 0; i < num_returns; i++) { reconstructable_return_ids.insert(spec.ReturnId(i)); } + counter.Increment(rpc::TaskStatus::WAITING_FOR_DEPENDENCIES); + } + + void SetStatus(rpc::TaskStatus new_status) { + counter.Swap(status, new_status); + status = new_status; } bool IsPending() const { return status != rpc::TaskStatus::FINISHED; } @@ -304,6 +321,8 @@ class TaskManager : public TaskFinisherInterface, public TaskResubmissionInterfa // Number of times this task may be resubmitted. If this reaches 0, then // the task entry may be erased. int num_retries_left; + // Reference to the task stats tracker. + TaskStatusCounter& counter; // Number of times this task successfully completed execution so far. int num_successful_executions = 0; // The task's current execution status. @@ -382,6 +401,9 @@ class TaskManager : public TaskFinisherInterface, public TaskResubmissionInterfa /// Protects below fields. mutable absl::Mutex mu_; + /// Tracks per-task-state counters for metric purposes. + TaskStatusCounter task_counter_ GUARDED_BY(mu_); + /// This map contains one entry per task that may be submitted for /// execution. This includes both tasks that are currently pending execution /// and tasks that finished execution but that may be retried again in the diff --git a/src/ray/stats/metric_defs.cc b/src/ray/stats/metric_defs.cc index 29b4e40d49f9..53e80b6ec072 100644 --- a/src/ray/stats/metric_defs.cc +++ b/src/ray/stats/metric_defs.cc @@ -31,6 +31,14 @@ namespace stats { /// NOTE: When adding a new metric, add the metric name to the _METRICS list in /// python/ray/tests/test_metrics_agent.py to ensure that its existence is tested. +/// Scheduler +DEFINE_stats( + tasks, + "Cumulative number of tasks currently in a particular state.", + ("State"), + (), + ray::stats::GAUGE); + /// Event stats DEFINE_stats(operation_count, "operation count", ("Method"), (), ray::stats::GAUGE); DEFINE_stats( diff --git a/src/ray/stats/metric_defs.h b/src/ray/stats/metric_defs.h index 934f1431e772..ec2f3dc443d4 100644 --- a/src/ray/stats/metric_defs.h +++ b/src/ray/stats/metric_defs.h @@ -42,6 +42,9 @@ namespace stats { /// ray_[component]_[metrics_name]_total (e.g., ray_pull_manager_total) /// +/// Task stats from core workers. +DECLARE_stats(tasks); + /// Event stats DECLARE_stats(operation_count); DECLARE_stats(operation_run_time_ms); diff --git a/src/ray/stats/tag_defs.cc b/src/ray/stats/tag_defs.cc index d33cea1f3fab..bba4925b3c02 100644 --- a/src/ray/stats/tag_defs.cc +++ b/src/ray/stats/tag_defs.cc @@ -35,5 +35,7 @@ const TagKeyType DriverPidKey = TagKeyType::Register("DriverPid"); const TagKeyType ResourceNameKey = TagKeyType::Register("ResourceName"); const TagKeyType ActorIdKey = TagKeyType::Register("ActorId"); + +const TagKeyType WorkerIdKey = TagKeyType::Register("WorkerId"); } // namespace stats } // namespace ray diff --git a/src/ray/stats/tag_defs.h b/src/ray/stats/tag_defs.h index 89e9ef864a70..d51485582bc0 100644 --- a/src/ray/stats/tag_defs.h +++ b/src/ray/stats/tag_defs.h @@ -39,3 +39,5 @@ extern const TagKeyType DriverPidKey; extern const TagKeyType ResourceNameKey; extern const TagKeyType ActorIdKey; + +extern const TagKeyType WorkerIdKey; From 695de2af27749045b3f54328cd8197f527bd7303 Mon Sep 17 00:00:00 2001 From: Eric Liang Date: Fri, 9 Sep 2022 16:34:21 -0700 Subject: [PATCH 02/17] working Signed-off-by: Eric Liang --- python/ray/_private/prometheus_exporter.py | 1 + src/ray/core_worker/core_worker.cc | 5 +++++ src/ray/core_worker/task_manager.cc | 18 +++++++++--------- src/ray/core_worker/task_manager.h | 21 ++++++++++++++++++--- src/ray/gcs/gcs_server/gcs_server_main.cc | 1 + src/ray/raylet/main.cc | 1 + 6 files changed, 35 insertions(+), 12 deletions(-) diff --git a/python/ray/_private/prometheus_exporter.py b/python/ray/_private/prometheus_exporter.py index 52890b816792..d5284c20d2b6 100644 --- a/python/ray/_private/prometheus_exporter.py +++ b/python/ray/_private/prometheus_exporter.py @@ -295,6 +295,7 @@ def emit(self, view_data): # pragma: NO COVER for v_data in view_data: if v_data.tag_value_aggregation_data_map: self.collector.add_view_data(v_data) + print("ADD VIEW DATA", v_data) def serve_http(self): """serve_http serves the Prometheus endpoint.""" diff --git a/src/ray/core_worker/core_worker.cc b/src/ray/core_worker/core_worker.cc index 5b8fbb804918..02fa2c9b4e6e 100644 --- a/src/ray/core_worker/core_worker.cc +++ b/src/ray/core_worker/core_worker.cc @@ -29,6 +29,7 @@ #include "ray/core_worker/transport/direct_actor_transport.h" #include "ray/gcs/gcs_client/gcs_client.h" #include "ray/gcs/pb_util.h" +#include "ray/stats/metric_defs.h" #include "ray/stats/stats.h" #include "ray/util/event.h" #include "ray/util/util.h" @@ -2183,6 +2184,8 @@ Status CoreWorker::ExecuteTask(const TaskSpecification &task_spec, std::vector> *return_objects, ReferenceCounter::ReferenceTableProto *borrowed_refs, bool *is_retryable_error) { + ray::stats::STATS_tasks.Record(1, rpc::TaskStatus_Name(rpc::TaskStatus::RUNNING)); + ray::stats::STATS_tasks.Record(-1, rpc::TaskStatus_Name(rpc::TaskStatus::WAITING_FOR_EXECUTION)); RAY_LOG(DEBUG) << "Executing task, task info = " << task_spec.DebugString(); task_queue_length_ -= 1; num_executed_tasks_ += 1; @@ -2340,6 +2343,8 @@ Status CoreWorker::ExecuteTask(const TaskSpecification &task_spec, RAY_LOG(FATAL) << "Unexpected task status type : " << status; } + ray::stats::STATS_tasks.Record(0, rpc::TaskStatus_Name(rpc::TaskStatus::RUNNING)); + ray::stats::STATS_tasks.Record(0, rpc::TaskStatus_Name(rpc::TaskStatus::WAITING_FOR_EXECUTION)); return status; } diff --git a/src/ray/core_worker/task_manager.cc b/src/ray/core_worker/task_manager.cc index 7f5b312ff11f..c7c8b0ffa6ba 100644 --- a/src/ray/core_worker/task_manager.cc +++ b/src/ray/core_worker/task_manager.cc @@ -122,7 +122,7 @@ bool TaskManager::ResubmitTask(const TaskID &task_id, std::vector *tas if (!it->second.IsPending()) { resubmit = true; - it->second.status = rpc::TaskStatus::WAITING_FOR_DEPENDENCIES; + it->second.SetStatus(rpc::TaskStatus::WAITING_FOR_DEPENDENCIES); num_pending_tasks_++; // The task is pending again, so it's no longer counted as lineage. If @@ -340,7 +340,7 @@ void TaskManager::CompletePendingTask(const TaskID &task_id, << " plasma returns in scope"; it->second.num_successful_executions++; - it->second.status = rpc::TaskStatus::FINISHED; + it->second.SetStatus(rpc::TaskStatus::FINISHED); num_pending_tasks_--; // A finished task can only be re-executed if it has some number of @@ -392,7 +392,7 @@ bool TaskManager::RetryTaskIfPossible(const TaskID &task_id) { } else { RAY_CHECK(num_retries_left == 0 || num_retries_left == -1); } - it->second.status = rpc::TaskStatus::SCHEDULED; + it->second.SetStatus(rpc::TaskStatus::SCHEDULED); } // We should not hold the lock during these calls because they may trigger @@ -653,7 +653,7 @@ void TaskManager::AddTaskStatusInfo(rpc::CoreWorkerStats *stats) const { if (it == submissible_tasks_.end()) { continue; } - ref->set_task_status(it->second.status); + ref->set_task_status(it->second.GetStatus()); ref->set_attempt_number(it->second.spec.AttemptNumber()); } } @@ -664,8 +664,8 @@ void TaskManager::MarkDependenciesResolved(const TaskID &task_id) { if (it == submissible_tasks_.end()) { return; } - if (it->second.status == rpc::TaskStatus::WAITING_FOR_DEPENDENCIES) { - it->second.status = rpc::TaskStatus::SCHEDULED; + if (it->second.GetStatus() == rpc::TaskStatus::WAITING_FOR_DEPENDENCIES) { + it->second.SetStatus(rpc::TaskStatus::SCHEDULED); } } @@ -675,8 +675,8 @@ void TaskManager::MarkTaskWaitingForExecution(const TaskID &task_id) { if (it == submissible_tasks_.end()) { return; } - RAY_CHECK(it->second.status == rpc::TaskStatus::SCHEDULED); - it->second.status = rpc::TaskStatus::WAITING_FOR_EXECUTION; + RAY_CHECK(it->second.GetStatus() == rpc::TaskStatus::SCHEDULED); + it->second.SetStatus(rpc::TaskStatus::WAITING_FOR_EXECUTION); } void TaskManager::FillTaskInfo(rpc::GetCoreWorkerStatsReply *reply, @@ -693,7 +693,7 @@ void TaskManager::FillTaskInfo(rpc::GetCoreWorkerStatsReply *reply, const auto &task_entry = task_it.second; auto entry = reply->add_owned_task_info_entries(); const auto &task_spec = task_entry.spec; - const auto &task_state = task_entry.status; + const auto &task_state = task_entry.GetStatus(); rpc::TaskType type; if (task_spec.IsNormalTask()) { type = rpc::TaskType::NORMAL_TASK; diff --git a/src/ray/core_worker/task_manager.h b/src/ray/core_worker/task_manager.h index 2c87c3e7ea3e..44ce54dffd19 100644 --- a/src/ray/core_worker/task_manager.h +++ b/src/ray/core_worker/task_manager.h @@ -28,13 +28,22 @@ namespace ray { namespace core { +// TODO(ekl) move to cc file class TaskStatusCounter { public: void Swap(rpc::TaskStatus old_status, rpc::TaskStatus new_status) { + counters_[old_status] -= 1; + counters_[new_status] += 1; + RAY_CHECK(counters_[old_status] >= 0); + ray::stats::STATS_tasks.Record(counters_[old_status], rpc::TaskStatus_Name(old_status)); + ray::stats::STATS_tasks.Record(counters_[new_status], rpc::TaskStatus_Name(new_status)); } void Increment(rpc::TaskStatus status) { - ray::stats::STATS_tasks.Record(1, "RUNNING"); + counters_[status] += 1; + ray::stats::STATS_tasks.Record(counters_[status], rpc::TaskStatus_Name(status)); } + private: + int64_t counters_[rpc::TaskStatus_ARRAYSIZE] = {}; }; class TaskFinisherInterface { @@ -300,6 +309,10 @@ class TaskManager : public TaskFinisherInterface, public TaskResubmissionInterfa status = new_status; } + rpc::TaskStatus GetStatus() const { + return status; + } + bool IsPending() const { return status != rpc::TaskStatus::FINISHED; } bool IsWaitingForExecution() const { @@ -325,8 +338,6 @@ class TaskManager : public TaskFinisherInterface, public TaskResubmissionInterfa TaskStatusCounter& counter; // Number of times this task successfully completed execution so far. int num_successful_executions = 0; - // The task's current execution status. - rpc::TaskStatus status = rpc::TaskStatus::WAITING_FOR_DEPENDENCIES; // Objects returned by this task that are reconstructable. This is set // initially to the task's return objects, since if the task fails, these // objects may be reconstructed by resubmitting the task. Once the task @@ -344,6 +355,10 @@ class TaskManager : public TaskFinisherInterface, public TaskResubmissionInterfa // lineage. We cache this because the task spec protobuf can mutate // out-of-band. int64_t lineage_footprint_bytes = 0; + + private: + // The task's current execution status. + rpc::TaskStatus status = rpc::TaskStatus::WAITING_FOR_DEPENDENCIES; }; /// Remove a lineage reference to this object ID. This should be called diff --git a/src/ray/gcs/gcs_server/gcs_server_main.cc b/src/ray/gcs/gcs_server/gcs_server_main.cc index 0f47226b8971..1d747ba296a8 100644 --- a/src/ray/gcs/gcs_server/gcs_server_main.cc +++ b/src/ray/gcs/gcs_server/gcs_server_main.cc @@ -66,6 +66,7 @@ int main(int argc, char *argv[]) { const ray::stats::TagsType global_tags = { {ray::stats::ComponentKey, "gcs_server"}, + {ray::stats::WorkerIdKey, ""}, {ray::stats::VersionKey, kRayVersion}, {ray::stats::NodeAddressKey, node_ip_address}}; ray::stats::Init(global_tags, metrics_agent_port); diff --git a/src/ray/raylet/main.cc b/src/ray/raylet/main.cc index fb6cf7631344..62aba43dbc3a 100644 --- a/src/ray/raylet/main.cc +++ b/src/ray/raylet/main.cc @@ -258,6 +258,7 @@ int main(int argc, char *argv[]) { // Initialize stats. const ray::stats::TagsType global_tags = { {ray::stats::ComponentKey, "raylet"}, + {ray::stats::WorkerIdKey, ""}, {ray::stats::VersionKey, kRayVersion}, {ray::stats::NodeAddressKey, node_ip_address}}; ray::stats::Init(global_tags, metrics_agent_port); From 6a5dda1e372339541f928d69f957c5aca497f94f Mon Sep 17 00:00:00 2001 From: Eric Liang Date: Mon, 12 Sep 2022 15:33:08 -0700 Subject: [PATCH 03/17] wip tests Signed-off-by: Eric Liang --- python/ray/_private/test_utils.py | 9 +++++ python/ray/tests/test_task_metrics.py | 54 +++++++++++++++++++++++++++ 2 files changed, 63 insertions(+) create mode 100644 python/ray/tests/test_task_metrics.py diff --git a/python/ray/_private/test_utils.py b/python/ray/_private/test_utils.py index 180ac7650c02..ea187b14daa3 100644 --- a/python/ray/_private/test_utils.py +++ b/python/ray/_private/test_utils.py @@ -14,6 +14,7 @@ import time import timeit import traceback +from collections import defaultdict from contextlib import contextmanager, redirect_stderr, redirect_stdout from typing import Any, Dict, List, Optional @@ -819,6 +820,14 @@ def fetch_prometheus(prom_addresses): return components_dict, metric_names, metric_samples +def fetch_prometheus_metrics(prom_addresses): + _, _, samples = fetch_prometheus(prom_addresses) + samples_by_name = defaultdict(list) + for sample in samples: + samples_by_name[sample.name].append(sample) + return samples_by_name + + def load_test_config(config_file_name): """Loads a config yaml from tests/test_cli_patterns.""" here = os.path.realpath(__file__) diff --git a/python/ray/tests/test_task_metrics.py b/python/ray/tests/test_task_metrics.py new file mode 100644 index 000000000000..ca87d533a761 --- /dev/null +++ b/python/ray/tests/test_task_metrics.py @@ -0,0 +1,54 @@ +from collections import defaultdict +import os +import time + +import pytest + +import ray +from ray._private.test_utils import ( + fetch_prometheus_metrics, + wait_for_condition, +) + + +def tasks_by_state(info) -> dict: + metrics_page = "localhost:{}".format(info["metrics_export_port"]) + res = fetch_prometheus_metrics([metrics_page]) + if "ray_tasks" in res: + states = defaultdict(int) + for sample in res["ray_tasks"]: + states[sample.labels["State"]] += sample.value + print("Tasks by state: {}".format(states)) + return states + else: + print("No task metrics yet.") + return {} + + +def test_task_basic(shutdown_only): + info = ray.init(num_cpus=2) + + @ray.remote + def f(): + time.sleep(999) + + refs = [f.remote() for _ in range(10)] + + expected = { + "RUNNING": 2.0, + "WAITING_FOR_EXECUTION": 0.0, + "SCHEDULED": 8.0, + "WAITING_FOR_DEPENDENCIES": 0.0, + } + # TODO(ekl) optimize the reporting interval to be faster for testing + wait_for_condition( + lambda: tasks_by_state(info) == expected, timeout=20, retry_interval_ms=2000) + + +if __name__ == "__main__": + import sys + + if os.environ.get("PARALLEL_CI"): + sys.exit(pytest.main(["-n", "auto", "--boxed", "-vs", __file__])) + else: + sys.exit(pytest.main(["-sv", __file__])) From a800128a581b2c77cfc3ade67fe102fa525fdda8 Mon Sep 17 00:00:00 2001 From: Eric Liang Date: Mon, 12 Sep 2022 15:36:07 -0700 Subject: [PATCH 04/17] add todos Signed-off-by: Eric Liang --- python/ray/tests/test_task_metrics.py | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/python/ray/tests/test_task_metrics.py b/python/ray/tests/test_task_metrics.py index ca87d533a761..71a6792a239a 100644 --- a/python/ray/tests/test_task_metrics.py +++ b/python/ray/tests/test_task_metrics.py @@ -45,6 +45,12 @@ def f(): lambda: tasks_by_state(info) == expected, timeout=20, retry_interval_ms=2000) +# TODO(ekl) test wait on deps +# TODO(ekl) test actor tasks waiting for execution (queued vs running) +# TODO(ekl) test finished with success / error +# TODO(ekl) test wait on object store transfer (??) + + if __name__ == "__main__": import sys From c4dc617ee1f3fd5610d4fdc25c254c022a02a979 Mon Sep 17 00:00:00 2001 From: Eric Liang Date: Mon, 12 Sep 2022 15:57:41 -0700 Subject: [PATCH 05/17] add more tests Signed-off-by: Eric Liang --- python/ray/tests/test_task_metrics.py | 73 +++++++++++++++++++++++++-- 1 file changed, 70 insertions(+), 3 deletions(-) diff --git a/python/ray/tests/test_task_metrics.py b/python/ray/tests/test_task_metrics.py index 71a6792a239a..170c497bb17e 100644 --- a/python/ray/tests/test_task_metrics.py +++ b/python/ray/tests/test_task_metrics.py @@ -45,9 +45,76 @@ def f(): lambda: tasks_by_state(info) == expected, timeout=20, retry_interval_ms=2000) -# TODO(ekl) test wait on deps -# TODO(ekl) test actor tasks waiting for execution (queued vs running) -# TODO(ekl) test finished with success / error +def test_task_wait_on_deps(shutdown_only): + info = ray.init(num_cpus=2) + + @ray.remote + def f(): + time.sleep(999) + + @ray.remote + def g(x): + time.sleep(999) + + x = f.remote() + refs = [g.remote(x) for _ in range(10)] + + expected = { + "RUNNING": 1.0, + "WAITING_FOR_EXECUTION": 0.0, + "SCHEDULED": 0.0, + "WAITING_FOR_DEPENDENCIES": 10.0, + } + wait_for_condition( + lambda: tasks_by_state(info) == expected, timeout=20, retry_interval_ms=2000) + + +def test_actor_tasks_queued(shutdown_only): + info = ray.init(num_cpus=2) + + @ray.remote + class F: + def f(self): + time.sleep(999) + + a = F.remote() + refs = [a.f.remote() for _ in range(10)] + + expected = { + "RUNNING": 1.0, + "WAITING_FOR_EXECUTION": 9.0, + "SCHEDULED": 0.0, + "WAITING_FOR_DEPENDENCIES": 0.0, + "FINISHED": 1.0, + } + wait_for_condition( + lambda: tasks_by_state(info) == expected, timeout=20, retry_interval_ms=2000) + + +def test_task_finish(shutdown_only): + info = ray.init(num_cpus=2) + + @ray.remote + def f(): + return "ok" + + @ray.remote + def g(): + assert False + + refs = (f.remote(), g.remote()) + + expected = { + "RUNNING": 0.0, + "WAITING_FOR_EXECUTION": 0.0, + "SCHEDULED": 0.0, + "WAITING_FOR_DEPENDENCIES": 0.0, + "FINISHED": 2.0, + } + wait_for_condition( + lambda: tasks_by_state(info) == expected, timeout=20, retry_interval_ms=2000) + + # TODO(ekl) test wait on object store transfer (??) From 7260bdb20f4b6ab3936014b3b82709ba9e29e472 Mon Sep 17 00:00:00 2001 From: Eric Liang Date: Mon, 12 Sep 2022 15:58:08 -0700 Subject: [PATCH 06/17] lint Signed-off-by: Eric Liang --- python/ray/tests/test_task_metrics.py | 20 +++++++----- src/ray/core_worker/core_worker.cc | 6 ++-- src/ray/core_worker/task_manager.cc | 5 ++- src/ray/core_worker/task_manager.h | 45 ++++++++++++++------------- src/ray/stats/metric_defs.cc | 11 +++---- 5 files changed, 46 insertions(+), 41 deletions(-) diff --git a/python/ray/tests/test_task_metrics.py b/python/ray/tests/test_task_metrics.py index 170c497bb17e..f87c607077dc 100644 --- a/python/ray/tests/test_task_metrics.py +++ b/python/ray/tests/test_task_metrics.py @@ -32,7 +32,7 @@ def test_task_basic(shutdown_only): def f(): time.sleep(999) - refs = [f.remote() for _ in range(10)] + [f.remote() for _ in range(10)] expected = { "RUNNING": 2.0, @@ -42,7 +42,8 @@ def f(): } # TODO(ekl) optimize the reporting interval to be faster for testing wait_for_condition( - lambda: tasks_by_state(info) == expected, timeout=20, retry_interval_ms=2000) + lambda: tasks_by_state(info) == expected, timeout=20, retry_interval_ms=2000 + ) def test_task_wait_on_deps(shutdown_only): @@ -57,7 +58,7 @@ def g(x): time.sleep(999) x = f.remote() - refs = [g.remote(x) for _ in range(10)] + [g.remote(x) for _ in range(10)] expected = { "RUNNING": 1.0, @@ -66,7 +67,8 @@ def g(x): "WAITING_FOR_DEPENDENCIES": 10.0, } wait_for_condition( - lambda: tasks_by_state(info) == expected, timeout=20, retry_interval_ms=2000) + lambda: tasks_by_state(info) == expected, timeout=20, retry_interval_ms=2000 + ) def test_actor_tasks_queued(shutdown_only): @@ -78,7 +80,7 @@ def f(self): time.sleep(999) a = F.remote() - refs = [a.f.remote() for _ in range(10)] + [a.f.remote() for _ in range(10)] expected = { "RUNNING": 1.0, @@ -88,7 +90,8 @@ def f(self): "FINISHED": 1.0, } wait_for_condition( - lambda: tasks_by_state(info) == expected, timeout=20, retry_interval_ms=2000) + lambda: tasks_by_state(info) == expected, timeout=20, retry_interval_ms=2000 + ) def test_task_finish(shutdown_only): @@ -102,7 +105,7 @@ def f(): def g(): assert False - refs = (f.remote(), g.remote()) + (f.remote(), g.remote()) expected = { "RUNNING": 0.0, @@ -112,7 +115,8 @@ def g(): "FINISHED": 2.0, } wait_for_condition( - lambda: tasks_by_state(info) == expected, timeout=20, retry_interval_ms=2000) + lambda: tasks_by_state(info) == expected, timeout=20, retry_interval_ms=2000 + ) # TODO(ekl) test wait on object store transfer (??) diff --git a/src/ray/core_worker/core_worker.cc b/src/ray/core_worker/core_worker.cc index 02fa2c9b4e6e..a56301c57e4a 100644 --- a/src/ray/core_worker/core_worker.cc +++ b/src/ray/core_worker/core_worker.cc @@ -2185,7 +2185,8 @@ Status CoreWorker::ExecuteTask(const TaskSpecification &task_spec, ReferenceCounter::ReferenceTableProto *borrowed_refs, bool *is_retryable_error) { ray::stats::STATS_tasks.Record(1, rpc::TaskStatus_Name(rpc::TaskStatus::RUNNING)); - ray::stats::STATS_tasks.Record(-1, rpc::TaskStatus_Name(rpc::TaskStatus::WAITING_FOR_EXECUTION)); + ray::stats::STATS_tasks.Record( + -1, rpc::TaskStatus_Name(rpc::TaskStatus::WAITING_FOR_EXECUTION)); RAY_LOG(DEBUG) << "Executing task, task info = " << task_spec.DebugString(); task_queue_length_ -= 1; num_executed_tasks_ += 1; @@ -2344,7 +2345,8 @@ Status CoreWorker::ExecuteTask(const TaskSpecification &task_spec, } ray::stats::STATS_tasks.Record(0, rpc::TaskStatus_Name(rpc::TaskStatus::RUNNING)); - ray::stats::STATS_tasks.Record(0, rpc::TaskStatus_Name(rpc::TaskStatus::WAITING_FOR_EXECUTION)); + ray::stats::STATS_tasks.Record( + 0, rpc::TaskStatus_Name(rpc::TaskStatus::WAITING_FOR_EXECUTION)); return status; } diff --git a/src/ray/core_worker/task_manager.cc b/src/ray/core_worker/task_manager.cc index c7c8b0ffa6ba..edd13c2f5d80 100644 --- a/src/ray/core_worker/task_manager.cc +++ b/src/ray/core_worker/task_manager.cc @@ -97,9 +97,8 @@ std::vector TaskManager::AddPendingTask( { absl::MutexLock lock(&mu_); - auto inserted = submissible_tasks_.emplace(spec.TaskId(), - TaskEntry(spec, max_retries, num_returns, - task_counter_)); + auto inserted = submissible_tasks_.emplace( + spec.TaskId(), TaskEntry(spec, max_retries, num_returns, task_counter_)); RAY_CHECK(inserted.second); num_pending_tasks_++; } diff --git a/src/ray/core_worker/task_manager.h b/src/ray/core_worker/task_manager.h index 44ce54dffd19..d8b9f1889c2d 100644 --- a/src/ray/core_worker/task_manager.h +++ b/src/ray/core_worker/task_manager.h @@ -30,20 +30,23 @@ namespace core { // TODO(ekl) move to cc file class TaskStatusCounter { - public: - void Swap(rpc::TaskStatus old_status, rpc::TaskStatus new_status) { - counters_[old_status] -= 1; - counters_[new_status] += 1; - RAY_CHECK(counters_[old_status] >= 0); - ray::stats::STATS_tasks.Record(counters_[old_status], rpc::TaskStatus_Name(old_status)); - ray::stats::STATS_tasks.Record(counters_[new_status], rpc::TaskStatus_Name(new_status)); - } - void Increment(rpc::TaskStatus status) { - counters_[status] += 1; - ray::stats::STATS_tasks.Record(counters_[status], rpc::TaskStatus_Name(status)); - } - private: - int64_t counters_[rpc::TaskStatus_ARRAYSIZE] = {}; + public: + void Swap(rpc::TaskStatus old_status, rpc::TaskStatus new_status) { + counters_[old_status] -= 1; + counters_[new_status] += 1; + RAY_CHECK(counters_[old_status] >= 0); + ray::stats::STATS_tasks.Record(counters_[old_status], + rpc::TaskStatus_Name(old_status)); + ray::stats::STATS_tasks.Record(counters_[new_status], + rpc::TaskStatus_Name(new_status)); + } + void Increment(rpc::TaskStatus status) { + counters_[status] += 1; + ray::stats::STATS_tasks.Record(counters_[status], rpc::TaskStatus_Name(status)); + } + + private: + int64_t counters_[rpc::TaskStatus_ARRAYSIZE] = {}; }; class TaskFinisherInterface { @@ -296,7 +299,7 @@ class TaskManager : public TaskFinisherInterface, public TaskResubmissionInterfa TaskEntry(const TaskSpecification &spec_arg, int num_retries_left_arg, size_t num_returns, - TaskStatusCounter& counter) + TaskStatusCounter &counter) : spec(spec_arg), num_retries_left(num_retries_left_arg), counter(counter) { for (size_t i = 0; i < num_returns; i++) { reconstructable_return_ids.insert(spec.ReturnId(i)); @@ -309,9 +312,7 @@ class TaskManager : public TaskFinisherInterface, public TaskResubmissionInterfa status = new_status; } - rpc::TaskStatus GetStatus() const { - return status; - } + rpc::TaskStatus GetStatus() const { return status; } bool IsPending() const { return status != rpc::TaskStatus::FINISHED; } @@ -335,7 +336,7 @@ class TaskManager : public TaskFinisherInterface, public TaskResubmissionInterfa // the task entry may be erased. int num_retries_left; // Reference to the task stats tracker. - TaskStatusCounter& counter; + TaskStatusCounter &counter; // Number of times this task successfully completed execution so far. int num_successful_executions = 0; // Objects returned by this task that are reconstructable. This is set @@ -356,9 +357,9 @@ class TaskManager : public TaskFinisherInterface, public TaskResubmissionInterfa // out-of-band. int64_t lineage_footprint_bytes = 0; - private: - // The task's current execution status. - rpc::TaskStatus status = rpc::TaskStatus::WAITING_FOR_DEPENDENCIES; + private: + // The task's current execution status. + rpc::TaskStatus status = rpc::TaskStatus::WAITING_FOR_DEPENDENCIES; }; /// Remove a lineage reference to this object ID. This should be called diff --git a/src/ray/stats/metric_defs.cc b/src/ray/stats/metric_defs.cc index 53e80b6ec072..25c91c377b7f 100644 --- a/src/ray/stats/metric_defs.cc +++ b/src/ray/stats/metric_defs.cc @@ -32,12 +32,11 @@ namespace stats { /// python/ray/tests/test_metrics_agent.py to ensure that its existence is tested. /// Scheduler -DEFINE_stats( - tasks, - "Cumulative number of tasks currently in a particular state.", - ("State"), - (), - ray::stats::GAUGE); +DEFINE_stats(tasks, + "Cumulative number of tasks currently in a particular state.", + ("State"), + (), + ray::stats::GAUGE); /// Event stats DEFINE_stats(operation_count, "operation count", ("Method"), (), ray::stats::GAUGE); From 0a76be80f972e40dece54533511905d279da2c10 Mon Sep 17 00:00:00 2001 From: Eric Liang Date: Mon, 12 Sep 2022 15:59:53 -0700 Subject: [PATCH 07/17] add retry test Signed-off-by: Eric Liang --- python/ray/tests/test_task_metrics.py | 22 ++++++++++++++++++++++ 1 file changed, 22 insertions(+) diff --git a/python/ray/tests/test_task_metrics.py b/python/ray/tests/test_task_metrics.py index f87c607077dc..4464a83490ba 100644 --- a/python/ray/tests/test_task_metrics.py +++ b/python/ray/tests/test_task_metrics.py @@ -119,6 +119,28 @@ def g(): ) +def test_task_retry(shutdown_only): + info = ray.init(num_cpus=2) + + @ray.remote(retry_exceptions=True) + def f(): + assert False + + f.remote() + time.sleep(2) # Enough sleep so that retries have time to run. + + expected = { + "RUNNING": 0.0, + "WAITING_FOR_EXECUTION": 0.0, + "SCHEDULED": 0.0, + "WAITING_FOR_DEPENDENCIES": 0.0, + "FINISHED": 1.0, # Only recorded as finished once. + } + wait_for_condition( + lambda: tasks_by_state(info) == expected, timeout=20, retry_interval_ms=2000 + ) + + # TODO(ekl) test wait on object store transfer (??) From 235fc77a859831f5a416e702ab699ad8a300a082 Mon Sep 17 00:00:00 2001 From: Eric Liang Date: Mon, 12 Sep 2022 16:06:50 -0700 Subject: [PATCH 08/17] improve actor task test Signed-off-by: Eric Liang --- python/ray/tests/test_task_metrics.py | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/python/ray/tests/test_task_metrics.py b/python/ray/tests/test_task_metrics.py index 4464a83490ba..8a7fe394d406 100644 --- a/python/ray/tests/test_task_metrics.py +++ b/python/ray/tests/test_task_metrics.py @@ -78,16 +78,20 @@ def test_actor_tasks_queued(shutdown_only): class F: def f(self): time.sleep(999) + def g(self): + pass a = F.remote() - [a.f.remote() for _ in range(10)] + [a.g.remote() for _ in range(10)] + [a.f.remote() for _ in range(1)] # Further tasks should be blocked on this one. + [a.g.remote() for _ in range(9)] expected = { "RUNNING": 1.0, "WAITING_FOR_EXECUTION": 9.0, "SCHEDULED": 0.0, "WAITING_FOR_DEPENDENCIES": 0.0, - "FINISHED": 1.0, + "FINISHED": 11.0, } wait_for_condition( lambda: tasks_by_state(info) == expected, timeout=20, retry_interval_ms=2000 From 553cea9ba446fa58442ab8d111b1cdfa6334d7f5 Mon Sep 17 00:00:00 2001 From: Eric Liang Date: Mon, 12 Sep 2022 16:16:37 -0700 Subject: [PATCH 09/17] add todo Signed-off-by: Eric Liang --- python/ray/tests/test_task_metrics.py | 1 + 1 file changed, 1 insertion(+) diff --git a/python/ray/tests/test_task_metrics.py b/python/ray/tests/test_task_metrics.py index 8a7fe394d406..2b1a934e8a6b 100644 --- a/python/ray/tests/test_task_metrics.py +++ b/python/ray/tests/test_task_metrics.py @@ -85,6 +85,7 @@ def g(self): [a.g.remote() for _ in range(10)] [a.f.remote() for _ in range(1)] # Further tasks should be blocked on this one. [a.g.remote() for _ in range(9)] + # TODO: add actor vs task type label and name label? expected = { "RUNNING": 1.0, From 4bbc2f03ed165be080b351a58ae4dad83536aff5 Mon Sep 17 00:00:00 2001 From: Eric Liang Date: Tue, 13 Sep 2022 12:51:57 -0700 Subject: [PATCH 10/17] speed up test Signed-off-by: Eric Liang --- python/ray/_private/prometheus_exporter.py | 1 - python/ray/_private/test_utils.py | 10 +++++++++- python/ray/tests/BUILD | 1 + python/ray/tests/test_task_metrics.py | 20 ++++++++++++++------ 4 files changed, 24 insertions(+), 8 deletions(-) diff --git a/python/ray/_private/prometheus_exporter.py b/python/ray/_private/prometheus_exporter.py index d5284c20d2b6..52890b816792 100644 --- a/python/ray/_private/prometheus_exporter.py +++ b/python/ray/_private/prometheus_exporter.py @@ -295,7 +295,6 @@ def emit(self, view_data): # pragma: NO COVER for v_data in view_data: if v_data.tag_value_aggregation_data_map: self.collector.add_view_data(v_data) - print("ADD VIEW DATA", v_data) def serve_http(self): """serve_http serves the Prometheus endpoint.""" diff --git a/python/ray/_private/test_utils.py b/python/ray/_private/test_utils.py index ea187b14daa3..a7c2e9c52802 100644 --- a/python/ray/_private/test_utils.py +++ b/python/ray/_private/test_utils.py @@ -820,7 +820,15 @@ def fetch_prometheus(prom_addresses): return components_dict, metric_names, metric_samples -def fetch_prometheus_metrics(prom_addresses): +def fetch_prometheus_metrics(prom_addresses: List[str]) -> Dict[str, List[Any]]: + """Return prometheus metrics from the given addresses. + + Args: + prom_addresses: List of metrics_agent addresses to collect metrics from. + + Returns: + Dict mapping from metric name to list of samples for the metric. + """ _, _, samples = fetch_prometheus(prom_addresses) samples_by_name = defaultdict(list) for sample in samples: diff --git a/python/ray/tests/BUILD b/python/ray/tests/BUILD index e2ffc54cf8f6..00393aa44e45 100644 --- a/python/ray/tests/BUILD +++ b/python/ray/tests/BUILD @@ -116,6 +116,7 @@ py_test_module_list( "test_memory_scheduling.py", "test_memory_pressure.py", "test_metrics.py", + "test_task_metrics.py", "test_multi_node.py", "test_multi_node_2.py", "test_multinode_failures.py", diff --git a/python/ray/tests/test_task_metrics.py b/python/ray/tests/test_task_metrics.py index 2b1a934e8a6b..fa29b45a49d3 100644 --- a/python/ray/tests/test_task_metrics.py +++ b/python/ray/tests/test_task_metrics.py @@ -11,6 +11,13 @@ ) +METRIC_CONFIG = { + "_system_config": { + "metrics_report_interval_ms": 100, + } +} + + def tasks_by_state(info) -> dict: metrics_page = "localhost:{}".format(info["metrics_export_port"]) res = fetch_prometheus_metrics([metrics_page]) @@ -26,7 +33,7 @@ def tasks_by_state(info) -> dict: def test_task_basic(shutdown_only): - info = ray.init(num_cpus=2) + info = ray.init(num_cpus=2, **METRIC_CONFIG) @ray.remote def f(): @@ -47,7 +54,7 @@ def f(): def test_task_wait_on_deps(shutdown_only): - info = ray.init(num_cpus=2) + info = ray.init(num_cpus=2, **METRIC_CONFIG) @ray.remote def f(): @@ -72,12 +79,13 @@ def g(x): def test_actor_tasks_queued(shutdown_only): - info = ray.init(num_cpus=2) + info = ray.init(num_cpus=2, **METRIC_CONFIG) @ray.remote class F: def f(self): time.sleep(999) + def g(self): pass @@ -100,7 +108,7 @@ def g(self): def test_task_finish(shutdown_only): - info = ray.init(num_cpus=2) + info = ray.init(num_cpus=2, **METRIC_CONFIG) @ray.remote def f(): @@ -125,14 +133,14 @@ def g(): def test_task_retry(shutdown_only): - info = ray.init(num_cpus=2) + info = ray.init(num_cpus=2, **METRIC_CONFIG) @ray.remote(retry_exceptions=True) def f(): assert False f.remote() - time.sleep(2) # Enough sleep so that retries have time to run. + time.sleep(1) # Enough sleep so that retries have time to run. expected = { "RUNNING": 0.0, From 32f8267ed53d80723958b964de363c9edcd6bc7f Mon Sep 17 00:00:00 2001 From: Eric Liang Date: Tue, 13 Sep 2022 16:00:23 -0700 Subject: [PATCH 11/17] update Signed-off-by: Eric Liang --- python/ray/tests/test_task_metrics.py | 4 ++-- src/ray/core_worker/task_manager.cc | 20 ++++++++++++++++++++ src/ray/core_worker/task_manager.h | 24 ++++++++++-------------- src/ray/stats/metric_defs.cc | 2 +- 4 files changed, 33 insertions(+), 17 deletions(-) diff --git a/python/ray/tests/test_task_metrics.py b/python/ray/tests/test_task_metrics.py index fa29b45a49d3..029f6844067d 100644 --- a/python/ray/tests/test_task_metrics.py +++ b/python/ray/tests/test_task_metrics.py @@ -20,6 +20,7 @@ def tasks_by_state(info) -> dict: metrics_page = "localhost:{}".format(info["metrics_export_port"]) + print("Fetch metrics from", metrics_page) res = fetch_prometheus_metrics([metrics_page]) if "ray_tasks" in res: states = defaultdict(int) @@ -28,7 +29,6 @@ def tasks_by_state(info) -> dict: print("Tasks by state: {}".format(states)) return states else: - print("No task metrics yet.") return {} @@ -65,7 +65,7 @@ def g(x): time.sleep(999) x = f.remote() - [g.remote(x) for _ in range(10)] + [g.remote(x) for _ in range(5)] expected = { "RUNNING": 1.0, diff --git a/src/ray/core_worker/task_manager.cc b/src/ray/core_worker/task_manager.cc index edd13c2f5d80..fb98d2a9b48f 100644 --- a/src/ray/core_worker/task_manager.cc +++ b/src/ray/core_worker/task_manager.cc @@ -28,6 +28,26 @@ const int64_t kTaskFailureThrottlingThreshold = 50; // Throttle task failure logs to once this interval. const int64_t kTaskFailureLoggingFrequencyMillis = 5000; +TaskStatusCounter::TaskStatusCounter() { + // TODO(ekl): this isn't actually resetting OpenCensus properly + for (int i = 0; i < rpc::TaskStatus_ARRAYSIZE; i++) { + ray::stats::STATS_tasks.Record(0, rpc::TaskStatus_Name(i)); + } +} + +void TaskStatusCounter::Swap(rpc::TaskStatus old_status, rpc::TaskStatus new_status) { + counters_[old_status] -= 1; + counters_[new_status] += 1; + RAY_CHECK(counters_[old_status] >= 0); + ray::stats::STATS_tasks.Record(counters_[old_status], rpc::TaskStatus_Name(old_status)); + ray::stats::STATS_tasks.Record(counters_[new_status], rpc::TaskStatus_Name(new_status)); +} + +void TaskStatusCounter::Increment(rpc::TaskStatus status) { + counters_[status] += 1; + ray::stats::STATS_tasks.Record(counters_[status], rpc::TaskStatus_Name(status)); +} + std::vector TaskManager::AddPendingTask( const rpc::Address &caller_address, const TaskSpecification &spec, diff --git a/src/ray/core_worker/task_manager.h b/src/ray/core_worker/task_manager.h index d8b9f1889c2d..5963beb790ef 100644 --- a/src/ray/core_worker/task_manager.h +++ b/src/ray/core_worker/task_manager.h @@ -28,22 +28,18 @@ namespace ray { namespace core { -// TODO(ekl) move to cc file +/// This class tracks the number of tasks at a particular state for the +/// purpose of emitting Prometheus metrics. class TaskStatusCounter { public: - void Swap(rpc::TaskStatus old_status, rpc::TaskStatus new_status) { - counters_[old_status] -= 1; - counters_[new_status] += 1; - RAY_CHECK(counters_[old_status] >= 0); - ray::stats::STATS_tasks.Record(counters_[old_status], - rpc::TaskStatus_Name(old_status)); - ray::stats::STATS_tasks.Record(counters_[new_status], - rpc::TaskStatus_Name(new_status)); - } - void Increment(rpc::TaskStatus status) { - counters_[status] += 1; - ray::stats::STATS_tasks.Record(counters_[status], rpc::TaskStatus_Name(status)); - } + /// Construct a new TaskStatusCounter. + TaskStatusCounter(); + + /// Track the change of the status of a task from old to new status. + void Swap(rpc::TaskStatus old_status, rpc::TaskStatus new_status); + + /// Increment the number of tasks at a specific status by one. + void Increment(rpc::TaskStatus status); private: int64_t counters_[rpc::TaskStatus_ARRAYSIZE] = {}; diff --git a/src/ray/stats/metric_defs.cc b/src/ray/stats/metric_defs.cc index 25c91c377b7f..c5b3007af8f0 100644 --- a/src/ray/stats/metric_defs.cc +++ b/src/ray/stats/metric_defs.cc @@ -33,7 +33,7 @@ namespace stats { /// Scheduler DEFINE_stats(tasks, - "Cumulative number of tasks currently in a particular state.", + "Current number of tasks currently in a particular state.", ("State"), (), ray::stats::GAUGE); From b45ceba0fa0f842ef4e15419adbc5c280bcad207 Mon Sep 17 00:00:00 2001 From: Eric Liang Date: Wed, 14 Sep 2022 13:51:18 -0700 Subject: [PATCH 12/17] address asynci Signed-off-by: Eric Liang --- python/ray/tests/test_task_metrics.py | 36 +++++++++++++++++++------ src/ray/core_worker/core_worker.cc | 26 +++--------------- src/ray/core_worker/core_worker.h | 38 +++++++++++++++++++-------- src/ray/core_worker/task_manager.cc | 7 +---- 4 files changed, 59 insertions(+), 48 deletions(-) diff --git a/python/ray/tests/test_task_metrics.py b/python/ray/tests/test_task_metrics.py index 029f6844067d..750c299a0396 100644 --- a/python/ray/tests/test_task_metrics.py +++ b/python/ray/tests/test_task_metrics.py @@ -1,4 +1,5 @@ from collections import defaultdict +import asyncio import os import time @@ -47,9 +48,8 @@ def f(): "SCHEDULED": 8.0, "WAITING_FOR_DEPENDENCIES": 0.0, } - # TODO(ekl) optimize the reporting interval to be faster for testing wait_for_condition( - lambda: tasks_by_state(info) == expected, timeout=20, retry_interval_ms=2000 + lambda: tasks_by_state(info) == expected, timeout=20, retry_interval_ms=500 ) @@ -65,7 +65,7 @@ def g(x): time.sleep(999) x = f.remote() - [g.remote(x) for _ in range(5)] + [g.remote(x) for _ in range(10)] expected = { "RUNNING": 1.0, @@ -74,7 +74,7 @@ def g(x): "WAITING_FOR_DEPENDENCIES": 10.0, } wait_for_condition( - lambda: tasks_by_state(info) == expected, timeout=20, retry_interval_ms=2000 + lambda: tasks_by_state(info) == expected, timeout=20, retry_interval_ms=500 ) @@ -103,7 +103,7 @@ def g(self): "FINISHED": 11.0, } wait_for_condition( - lambda: tasks_by_state(info) == expected, timeout=20, retry_interval_ms=2000 + lambda: tasks_by_state(info) == expected, timeout=20, retry_interval_ms=500 ) @@ -128,7 +128,7 @@ def g(): "FINISHED": 2.0, } wait_for_condition( - lambda: tasks_by_state(info) == expected, timeout=20, retry_interval_ms=2000 + lambda: tasks_by_state(info) == expected, timeout=20, retry_interval_ms=500 ) @@ -150,11 +150,31 @@ def f(): "FINISHED": 1.0, # Only recorded as finished once. } wait_for_condition( - lambda: tasks_by_state(info) == expected, timeout=20, retry_interval_ms=2000 + lambda: tasks_by_state(info) == expected, timeout=20, retry_interval_ms=500 ) -# TODO(ekl) test wait on object store transfer (??) +def test_concurrent_actor_tasks(shutdown_only): + info = ray.init(num_cpus=2, **METRIC_CONFIG) + + @ray.remote(max_concurrency=30) + class A: + async def f(self): + await asyncio.sleep(300) + + a = A.remote() + [a.f.remote() for _ in range(40)] + + expected = { + "RUNNING": 30.0, + "WAITING_FOR_EXECUTION": 10.0, + "SCHEDULED": 0.0, + "WAITING_FOR_DEPENDENCIES": 0.0, + "FINISHED": 1.0, + } + wait_for_condition( + lambda: tasks_by_state(info) == expected, timeout=20, retry_interval_ms=500 + ) if __name__ == "__main__": diff --git a/src/ray/core_worker/core_worker.cc b/src/ray/core_worker/core_worker.cc index a56301c57e4a..78fb70825bd4 100644 --- a/src/ray/core_worker/core_worker.cc +++ b/src/ray/core_worker/core_worker.cc @@ -2184,20 +2184,13 @@ Status CoreWorker::ExecuteTask(const TaskSpecification &task_spec, std::vector> *return_objects, ReferenceCounter::ReferenceTableProto *borrowed_refs, bool *is_retryable_error) { - ray::stats::STATS_tasks.Record(1, rpc::TaskStatus_Name(rpc::TaskStatus::RUNNING)); - ray::stats::STATS_tasks.Record( - -1, rpc::TaskStatus_Name(rpc::TaskStatus::WAITING_FOR_EXECUTION)); RAY_LOG(DEBUG) << "Executing task, task info = " << task_spec.DebugString(); task_queue_length_ -= 1; num_executed_tasks_ += 1; // Modify the worker's per function counters. std::string func_name = task_spec.FunctionDescriptor()->CallString(); - { - absl::MutexLock l(&task_counter_.tasks_counter_mutex_); - task_counter_.Add(TaskCounter::kPending, func_name, -1); - task_counter_.Add(TaskCounter::kRunning, func_name, 1); - } + task_counter_.MovePendingToRunning(func_name); if (!options_.is_local_mode) { worker_context_.SetCurrentTask(task_spec); @@ -2313,13 +2306,7 @@ Status CoreWorker::ExecuteTask(const TaskSpecification &task_spec, } } - // Modify the worker's per function counters. - { - absl::MutexLock l(&task_counter_.tasks_counter_mutex_); - task_counter_.Add(TaskCounter::kRunning, func_name, -1); - task_counter_.Add(TaskCounter::kFinished, func_name, 1); - } - + task_counter_.MoveRunningToFinished(func_name); RAY_LOG(DEBUG) << "Finished executing task " << task_spec.TaskId() << ", status=" << status; @@ -2343,10 +2330,6 @@ Status CoreWorker::ExecuteTask(const TaskSpecification &task_spec, } else if (!status.ok()) { RAY_LOG(FATAL) << "Unexpected task status type : " << status; } - - ray::stats::STATS_tasks.Record(0, rpc::TaskStatus_Name(rpc::TaskStatus::RUNNING)); - ray::stats::STATS_tasks.Record( - 0, rpc::TaskStatus_Name(rpc::TaskStatus::WAITING_FOR_EXECUTION)); return status; } @@ -2560,10 +2543,7 @@ void CoreWorker::HandlePushTask(const rpc::PushTaskRequest &request, std::string func_name = FunctionDescriptorBuilder::FromProto(request.task_spec().function_descriptor()) ->CallString(); - { - absl::MutexLock l(&task_counter_.tasks_counter_mutex_); - task_counter_.Add(TaskCounter::kPending, func_name, 1); - } + task_counter_.IncPending(func_name); // For actor tasks, we just need to post a HandleActorTask instance to the task // execution service. diff --git a/src/ray/core_worker/core_worker.h b/src/ray/core_worker/core_worker.h index 3bf232cd83a9..be4bf4f288ee 100644 --- a/src/ray/core_worker/core_worker.h +++ b/src/ray/core_worker/core_worker.h @@ -1277,18 +1277,34 @@ class CoreWorker : public rpc::CoreWorkerServiceHandler { GUARDED_BY(tasks_counter_mutex_); absl::flat_hash_map finished_tasks_counter_map_ GUARDED_BY(tasks_counter_mutex_); + int64_t running_total_ = 0 GUARDED_BY(tasks_counter_mutex_); - void Add(TaskStatusType type, const std::string &func_name, int value) { - tasks_counter_mutex_.AssertHeld(); - if (type == kPending) { - pending_tasks_counter_map_[func_name] += value; - } else if (type == kRunning) { - running_tasks_counter_map_[func_name] += value; - } else if (type == kFinished) { - finished_tasks_counter_map_[func_name] += value; - } else { - RAY_CHECK(false) << "This line should not be reached."; - } + void IncPending(const std::string &func_name) { + absl::MutexLock l(&tasks_counter_mutex_); + pending_tasks_counter_map_[func_name] += 1; + } + + void MovePendingToRunning(const std::string &func_name) { + absl::MutexLock l(&tasks_counter_mutex_); + pending_tasks_counter_map_[func_name] -= 1; + running_tasks_counter_map_[func_name] += 1; + running_total_ += 1; + UpdateStats(); + } + + void MoveRunningToFinished(const std::string &func_name) { + absl::MutexLock l(&tasks_counter_mutex_); + running_tasks_counter_map_[func_name] -= 1; + finished_tasks_counter_map_[func_name] += 1; + running_total_ -= 1; + UpdateStats(); + } + + void UpdateStats() { + ray::stats::STATS_tasks.Record(running_total_, + rpc::TaskStatus_Name(rpc::TaskStatus::RUNNING)); + ray::stats::STATS_tasks.Record( + -running_total_, rpc::TaskStatus_Name(rpc::TaskStatus::WAITING_FOR_EXECUTION)); } }; TaskCounter task_counter_; diff --git a/src/ray/core_worker/task_manager.cc b/src/ray/core_worker/task_manager.cc index fb98d2a9b48f..ff0a2d39f4ab 100644 --- a/src/ray/core_worker/task_manager.cc +++ b/src/ray/core_worker/task_manager.cc @@ -28,12 +28,7 @@ const int64_t kTaskFailureThrottlingThreshold = 50; // Throttle task failure logs to once this interval. const int64_t kTaskFailureLoggingFrequencyMillis = 5000; -TaskStatusCounter::TaskStatusCounter() { - // TODO(ekl): this isn't actually resetting OpenCensus properly - for (int i = 0; i < rpc::TaskStatus_ARRAYSIZE; i++) { - ray::stats::STATS_tasks.Record(0, rpc::TaskStatus_Name(i)); - } -} +TaskStatusCounter::TaskStatusCounter() {} void TaskStatusCounter::Swap(rpc::TaskStatus old_status, rpc::TaskStatus new_status) { counters_[old_status] -= 1; From 42237f79aa507885bc5b347be2d078fca206bc51 Mon Sep 17 00:00:00 2001 From: Eric Liang Date: Wed, 14 Sep 2022 17:23:03 -0700 Subject: [PATCH 13/17] fix lint Signed-off-by: Eric Liang --- src/ray/core_worker/core_worker.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/ray/core_worker/core_worker.h b/src/ray/core_worker/core_worker.h index be4bf4f288ee..3cfb648b959e 100644 --- a/src/ray/core_worker/core_worker.h +++ b/src/ray/core_worker/core_worker.h @@ -1277,7 +1277,7 @@ class CoreWorker : public rpc::CoreWorkerServiceHandler { GUARDED_BY(tasks_counter_mutex_); absl::flat_hash_map finished_tasks_counter_map_ GUARDED_BY(tasks_counter_mutex_); - int64_t running_total_ = 0 GUARDED_BY(tasks_counter_mutex_); + int64_t running_total_ GUARDED_BY(tasks_counter_mutex_) = 0; void IncPending(const std::string &func_name) { absl::MutexLock l(&tasks_counter_mutex_); From 2c977091c119c822b55a8e2eea24d9a2dd61f6cf Mon Sep 17 00:00:00 2001 From: Eric Liang Date: Wed, 14 Sep 2022 18:16:19 -0700 Subject: [PATCH 14/17] export Signed-off-by: Eric Liang --- src/ray/common/ray_config_def.h | 2 +- src/ray/core_worker/core_worker.cc | 4 ++++ thirdparty/patches/opencensus-cpp-shutdown-api.patch | 9 +++++++-- 3 files changed, 12 insertions(+), 3 deletions(-) diff --git a/src/ray/common/ray_config_def.h b/src/ray/common/ray_config_def.h index 0ce2a195ea56..78842f936bf5 100644 --- a/src/ray/common/ray_config_def.h +++ b/src/ray/common/ray_config_def.h @@ -460,7 +460,7 @@ RAY_CONFIG(int64_t, idle_worker_killing_time_threshold_ms, 1000) RAY_CONFIG(int64_t, num_workers_soft_limit, -1) // The interval where metrics are exported in milliseconds. -RAY_CONFIG(uint64_t, metrics_report_interval_ms, 10000) +RAY_CONFIG(uint64_t, metrics_report_interval_ms, 5000) /// Enable the task timeline. If this is enabled, certain events such as task /// execution are profiled and sent to the GCS. diff --git a/src/ray/core_worker/core_worker.cc b/src/ray/core_worker/core_worker.cc index 78fb70825bd4..57eaa964ccae 100644 --- a/src/ray/core_worker/core_worker.cc +++ b/src/ray/core_worker/core_worker.cc @@ -539,6 +539,7 @@ void CoreWorker::Shutdown() { } RAY_LOG(INFO) << "Shutting down a core worker."; + is_shutdown_ = true; if (options_.worker_type == WorkerType::WORKER) { // Running in a main thread. @@ -590,6 +591,9 @@ void CoreWorker::Disconnect( const rpc::WorkerExitType &exit_type, const std::string &exit_detail, const std::shared_ptr &creation_task_exception_pb_bytes) { + RAY_LOG(ERROR) << "Forcing export now [Disconnect]"; + opencensus::stats::StatsExporter::ExportNow(); + RAY_LOG(ERROR) << "Export now, done"; if (connected_) { RAY_LOG(INFO) << "Disconnecting to the raylet."; connected_ = false; diff --git a/thirdparty/patches/opencensus-cpp-shutdown-api.patch b/thirdparty/patches/opencensus-cpp-shutdown-api.patch index 5fa92a206385..42663af6861a 100644 --- a/thirdparty/patches/opencensus-cpp-shutdown-api.patch +++ b/thirdparty/patches/opencensus-cpp-shutdown-api.patch @@ -85,7 +85,7 @@ diff --git opencensus/stats/internal/stats_exporter.cc opencensus/stats/internal index 43ddbc7..37b4ae1 100644 --- opencensus/stats/internal/stats_exporter.cc +++ opencensus/stats/internal/stats_exporter.cc -@@ -95,25 +95,52 @@ void StatsExporterImpl::ClearHandlersForTesting() { +@@ -95,25 +95,56 @@ void StatsExporterImpl::ClearHandlersForTesting() { } void StatsExporterImpl::StartExportThread() EXCLUSIVE_LOCKS_REQUIRED(mu_) { @@ -138,6 +138,10 @@ index 43ddbc7..37b4ae1 100644 + StatsExporterImpl::Get()->Shutdown(); + StatsExporterImpl::Get()->ClearHandlersForTesting(); +} ++ ++void StatsExporter::ExportNow() { ++ StatsExporterImpl::Get()->Export(); ++} + // static void StatsExporter::SetInterval(absl::Duration interval) { @@ -158,11 +162,12 @@ diff --git opencensus/stats/stats_exporter.h opencensus/stats/stats_exporter.h index 6756858..65e0262 100644 --- opencensus/stats/stats_exporter.h +++ opencensus/stats/stats_exporter.h -@@ -45,6 +45,8 @@ class StatsExporter final { +@@ -45,6 +45,9 @@ class StatsExporter final { // Removes the view with 'name' from the registry, if one is registered. static void RemoveView(absl::string_view name); + static void Shutdown(); ++ static void ExportNow(); + // StatsExporter::Handler is the interface for push exporters that export // recorded data for registered views. The exporter should provide a static From ff96358bcb4794ec5faa2bac5ea2872c2befa4c9 Mon Sep 17 00:00:00 2001 From: Eric Liang Date: Thu, 15 Sep 2022 14:10:23 -0700 Subject: [PATCH 15/17] add export now test Signed-off-by: Eric Liang --- python/ray/tests/test_task_metrics.py | 35 ++++++++++++++++++++++++++- src/ray/core_worker/core_worker.cc | 4 +-- 2 files changed, 35 insertions(+), 4 deletions(-) diff --git a/python/ray/tests/test_task_metrics.py b/python/ray/tests/test_task_metrics.py index a1cf090c3366..f72856551673 100644 --- a/python/ray/tests/test_task_metrics.py +++ b/python/ray/tests/test_task_metrics.py @@ -7,6 +7,7 @@ from ray._private.test_utils import ( fetch_prometheus_metrics, + run_string_as_driver, run_string_as_driver_nonblocking, wait_for_condition, ) @@ -58,13 +59,45 @@ def f(): "SCHEDULED": 8.0, "WAITING_FOR_DEPENDENCIES": 0.0, } - # TODO(ekl) optimize the reporting interval to be faster for testing wait_for_condition( lambda: tasks_by_state(info) == expected, timeout=20, retry_interval_ms=500 ) proc.kill() +def test_metrics_export_now(shutdown_only): + info = ray.init(num_cpus=2, **METRIC_CONFIG) + + driver = """ +import ray +import time + +ray.init("auto") + +@ray.remote +def f(): + pass +a = [f.remote() for _ in range(10)] +ray.get(a) +""" + + # If force export at process death is broken, we won't see the recently completed + # tasks from the drivers. + for _ in range(10): + run_string_as_driver(driver) + + expected = { + "RUNNING": 0.0, + "WAITING_FOR_EXECUTION": 0.0, + "SCHEDULED": 0.0, + "WAITING_FOR_DEPENDENCIES": 0.0, + "FINISHED": 100.0, + } + wait_for_condition( + lambda: tasks_by_state(info) == expected, timeout=20, retry_interval_ms=500 + ) + + def test_task_wait_on_deps(shutdown_only): info = ray.init(num_cpus=2, **METRIC_CONFIG) diff --git a/src/ray/core_worker/core_worker.cc b/src/ray/core_worker/core_worker.cc index 57eaa964ccae..31b57301f188 100644 --- a/src/ray/core_worker/core_worker.cc +++ b/src/ray/core_worker/core_worker.cc @@ -539,7 +539,6 @@ void CoreWorker::Shutdown() { } RAY_LOG(INFO) << "Shutting down a core worker."; - is_shutdown_ = true; if (options_.worker_type == WorkerType::WORKER) { // Running in a main thread. @@ -591,9 +590,8 @@ void CoreWorker::Disconnect( const rpc::WorkerExitType &exit_type, const std::string &exit_detail, const std::shared_ptr &creation_task_exception_pb_bytes) { - RAY_LOG(ERROR) << "Forcing export now [Disconnect]"; + // Force stats export before exiting the worker. opencensus::stats::StatsExporter::ExportNow(); - RAY_LOG(ERROR) << "Export now, done"; if (connected_) { RAY_LOG(INFO) << "Disconnecting to the raylet."; connected_ = false; From 8e4bc916eb7ae957da9bc85e6f25f733939b05b6 Mon Sep 17 00:00:00 2001 From: Eric Liang Date: Fri, 16 Sep 2022 12:43:24 -0700 Subject: [PATCH 16/17] increase reporting interval for test Signed-off-by: Eric Liang --- python/ray/tests/test_task_metrics.py | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/python/ray/tests/test_task_metrics.py b/python/ray/tests/test_task_metrics.py index f72856551673..630716ee2265 100644 --- a/python/ray/tests/test_task_metrics.py +++ b/python/ray/tests/test_task_metrics.py @@ -19,6 +19,12 @@ } } +SLOW_METRIC_CONFIG = { + "_system_config": { + "metrics_report_interval_ms": 3000, + } +} + def tasks_by_state(info) -> dict: metrics_page = "localhost:{}".format(info["metrics_export_port"]) @@ -66,7 +72,7 @@ def f(): def test_metrics_export_now(shutdown_only): - info = ray.init(num_cpus=2, **METRIC_CONFIG) + info = ray.init(num_cpus=2, **SLOW_METRIC_CONFIG) driver = """ import ray @@ -83,8 +89,10 @@ def f(): # If force export at process death is broken, we won't see the recently completed # tasks from the drivers. - for _ in range(10): + for i in range(10): + print("Run job", i) run_string_as_driver(driver) + tasks_by_state(info) expected = { "RUNNING": 0.0, From 6e5e988c0df01a5b7b73715fc126ccc64341de13 Mon Sep 17 00:00:00 2001 From: Eric Liang Date: Fri, 16 Sep 2022 12:44:44 -0700 Subject: [PATCH 17/17] move test Signed-off-by: Eric Liang --- python/ray/tests/test_task_metrics.py | 70 +++++++++++++-------------- 1 file changed, 35 insertions(+), 35 deletions(-) diff --git a/python/ray/tests/test_task_metrics.py b/python/ray/tests/test_task_metrics.py index 630716ee2265..481e9da25d5b 100644 --- a/python/ray/tests/test_task_metrics.py +++ b/python/ray/tests/test_task_metrics.py @@ -71,41 +71,6 @@ def f(): proc.kill() -def test_metrics_export_now(shutdown_only): - info = ray.init(num_cpus=2, **SLOW_METRIC_CONFIG) - - driver = """ -import ray -import time - -ray.init("auto") - -@ray.remote -def f(): - pass -a = [f.remote() for _ in range(10)] -ray.get(a) -""" - - # If force export at process death is broken, we won't see the recently completed - # tasks from the drivers. - for i in range(10): - print("Run job", i) - run_string_as_driver(driver) - tasks_by_state(info) - - expected = { - "RUNNING": 0.0, - "WAITING_FOR_EXECUTION": 0.0, - "SCHEDULED": 0.0, - "WAITING_FOR_DEPENDENCIES": 0.0, - "FINISHED": 100.0, - } - wait_for_condition( - lambda: tasks_by_state(info) == expected, timeout=20, retry_interval_ms=500 - ) - - def test_task_wait_on_deps(shutdown_only): info = ray.init(num_cpus=2, **METRIC_CONFIG) @@ -276,6 +241,41 @@ async def f(self): proc.kill() +def test_metrics_export_now(shutdown_only): + info = ray.init(num_cpus=2, **SLOW_METRIC_CONFIG) + + driver = """ +import ray +import time + +ray.init("auto") + +@ray.remote +def f(): + pass +a = [f.remote() for _ in range(10)] +ray.get(a) +""" + + # If force export at process death is broken, we won't see the recently completed + # tasks from the drivers. + for i in range(10): + print("Run job", i) + run_string_as_driver(driver) + tasks_by_state(info) + + expected = { + "RUNNING": 0.0, + "WAITING_FOR_EXECUTION": 0.0, + "SCHEDULED": 0.0, + "WAITING_FOR_DEPENDENCIES": 0.0, + "FINISHED": 100.0, + } + wait_for_condition( + lambda: tasks_by_state(info) == expected, timeout=20, retry_interval_ms=500 + ) + + if __name__ == "__main__": import sys