From b28a47c84fc38ea0d884a7f98e18016b69fb9e22 Mon Sep 17 00:00:00 2001 From: Eric Liang Date: Fri, 16 Sep 2022 16:11:25 -0700 Subject: [PATCH] [metrics] Force export census metrics on worker death (#28547) Signed-off-by: PaulFenton --- python/ray/tests/test_task_metrics.py | 43 ++++++++++++++++++- src/ray/common/ray_config_def.h | 2 +- src/ray/core_worker/core_worker.cc | 2 + .../patches/opencensus-cpp-shutdown-api.patch | 9 +++- 4 files changed, 52 insertions(+), 4 deletions(-) diff --git a/python/ray/tests/test_task_metrics.py b/python/ray/tests/test_task_metrics.py index 9030832dfb7d..979905809184 100644 --- a/python/ray/tests/test_task_metrics.py +++ b/python/ray/tests/test_task_metrics.py @@ -7,6 +7,7 @@ from ray._private.test_utils import ( fetch_prometheus_metrics, + run_string_as_driver, run_string_as_driver_nonblocking, wait_for_condition, ) @@ -18,6 +19,12 @@ } } +SLOW_METRIC_CONFIG = { + "_system_config": { + "metrics_report_interval_ms": 3000, + } +} + def tasks_by_state(info) -> dict: metrics_page = "localhost:{}".format(info["metrics_export_port"]) @@ -58,7 +65,6 @@ def f(): "SCHEDULED": 8.0, "WAITING_FOR_DEPENDENCIES": 0.0, } - # TODO(ekl) optimize the reporting interval to be faster for testing wait_for_condition( lambda: tasks_by_state(info) == expected, timeout=20, retry_interval_ms=500 ) @@ -270,6 +276,41 @@ async def f(self): proc.kill() +def test_metrics_export_now(shutdown_only): + info = ray.init(num_cpus=2, **SLOW_METRIC_CONFIG) + + driver = """ +import ray +import time + +ray.init("auto") + +@ray.remote +def f(): + pass +a = [f.remote() for _ in range(10)] +ray.get(a) +""" + + # If force export at process death is broken, we won't see the recently completed + # tasks from the drivers. + for i in range(10): + print("Run job", i) + run_string_as_driver(driver) + tasks_by_state(info) + + expected = { + "RUNNING": 0.0, + "WAITING_FOR_EXECUTION": 0.0, + "SCHEDULED": 0.0, + "WAITING_FOR_DEPENDENCIES": 0.0, + "FINISHED": 100.0, + } + wait_for_condition( + lambda: tasks_by_state(info) == expected, timeout=20, retry_interval_ms=500 + ) + + if __name__ == "__main__": import sys diff --git a/src/ray/common/ray_config_def.h b/src/ray/common/ray_config_def.h index 0ce2a195ea56..78842f936bf5 100644 --- a/src/ray/common/ray_config_def.h +++ b/src/ray/common/ray_config_def.h @@ -460,7 +460,7 @@ RAY_CONFIG(int64_t, idle_worker_killing_time_threshold_ms, 1000) RAY_CONFIG(int64_t, num_workers_soft_limit, -1) // The interval where metrics are exported in milliseconds. -RAY_CONFIG(uint64_t, metrics_report_interval_ms, 10000) +RAY_CONFIG(uint64_t, metrics_report_interval_ms, 5000) /// Enable the task timeline. If this is enabled, certain events such as task /// execution are profiled and sent to the GCS. diff --git a/src/ray/core_worker/core_worker.cc b/src/ray/core_worker/core_worker.cc index 78fb70825bd4..31b57301f188 100644 --- a/src/ray/core_worker/core_worker.cc +++ b/src/ray/core_worker/core_worker.cc @@ -590,6 +590,8 @@ void CoreWorker::Disconnect( const rpc::WorkerExitType &exit_type, const std::string &exit_detail, const std::shared_ptr &creation_task_exception_pb_bytes) { + // Force stats export before exiting the worker. + opencensus::stats::StatsExporter::ExportNow(); if (connected_) { RAY_LOG(INFO) << "Disconnecting to the raylet."; connected_ = false; diff --git a/thirdparty/patches/opencensus-cpp-shutdown-api.patch b/thirdparty/patches/opencensus-cpp-shutdown-api.patch index 5fa92a206385..42663af6861a 100644 --- a/thirdparty/patches/opencensus-cpp-shutdown-api.patch +++ b/thirdparty/patches/opencensus-cpp-shutdown-api.patch @@ -85,7 +85,7 @@ diff --git opencensus/stats/internal/stats_exporter.cc opencensus/stats/internal index 43ddbc7..37b4ae1 100644 --- opencensus/stats/internal/stats_exporter.cc +++ opencensus/stats/internal/stats_exporter.cc -@@ -95,25 +95,52 @@ void StatsExporterImpl::ClearHandlersForTesting() { +@@ -95,25 +95,56 @@ void StatsExporterImpl::ClearHandlersForTesting() { } void StatsExporterImpl::StartExportThread() EXCLUSIVE_LOCKS_REQUIRED(mu_) { @@ -138,6 +138,10 @@ index 43ddbc7..37b4ae1 100644 + StatsExporterImpl::Get()->Shutdown(); + StatsExporterImpl::Get()->ClearHandlersForTesting(); +} ++ ++void StatsExporter::ExportNow() { ++ StatsExporterImpl::Get()->Export(); ++} + // static void StatsExporter::SetInterval(absl::Duration interval) { @@ -158,11 +162,12 @@ diff --git opencensus/stats/stats_exporter.h opencensus/stats/stats_exporter.h index 6756858..65e0262 100644 --- opencensus/stats/stats_exporter.h +++ opencensus/stats/stats_exporter.h -@@ -45,6 +45,8 @@ class StatsExporter final { +@@ -45,6 +45,9 @@ class StatsExporter final { // Removes the view with 'name' from the registry, if one is registered. static void RemoveView(absl::string_view name); + static void Shutdown(); ++ static void ExportNow(); + // StatsExporter::Handler is the interface for push exporters that export // recorded data for registered views. The exporter should provide a static