Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[metrics] Force export census metrics on worker death #28547

Merged
merged 20 commits into from
Sep 16, 2022
Merged
35 changes: 34 additions & 1 deletion python/ray/tests/test_task_metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

from ray._private.test_utils import (
fetch_prometheus_metrics,
run_string_as_driver,
run_string_as_driver_nonblocking,
wait_for_condition,
)
Expand Down Expand Up @@ -58,13 +59,45 @@ def f():
"SCHEDULED": 8.0,
"WAITING_FOR_DEPENDENCIES": 0.0,
}
# TODO(ekl) optimize the reporting interval to be faster for testing
wait_for_condition(
lambda: tasks_by_state(info) == expected, timeout=20, retry_interval_ms=500
)
proc.kill()


def test_metrics_export_now(shutdown_only):
info = ray.init(num_cpus=2, **METRIC_CONFIG)
ericl marked this conversation as resolved.
Show resolved Hide resolved

driver = """
import ray
import time

ray.init("auto")

@ray.remote
def f():
pass
a = [f.remote() for _ in range(10)]
ray.get(a)
"""

# If force export at process death is broken, we won't see the recently completed
# tasks from the drivers.
for _ in range(10):
run_string_as_driver(driver)

expected = {
"RUNNING": 0.0,
"WAITING_FOR_EXECUTION": 0.0,
"SCHEDULED": 0.0,
"WAITING_FOR_DEPENDENCIES": 0.0,
"FINISHED": 100.0,
}
wait_for_condition(
lambda: tasks_by_state(info) == expected, timeout=20, retry_interval_ms=500
)


def test_task_wait_on_deps(shutdown_only):
info = ray.init(num_cpus=2, **METRIC_CONFIG)

Expand Down
2 changes: 1 addition & 1 deletion src/ray/common/ray_config_def.h
Original file line number Diff line number Diff line change
Expand Up @@ -460,7 +460,7 @@ RAY_CONFIG(int64_t, idle_worker_killing_time_threshold_ms, 1000)
RAY_CONFIG(int64_t, num_workers_soft_limit, -1)

// The interval where metrics are exported in milliseconds.
RAY_CONFIG(uint64_t, metrics_report_interval_ms, 10000)
RAY_CONFIG(uint64_t, metrics_report_interval_ms, 5000)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unrelatedly, it seems 5 seconds is probably safe for now.


/// Enable the task timeline. If this is enabled, certain events such as task
/// execution are profiled and sent to the GCS.
Expand Down
2 changes: 2 additions & 0 deletions src/ray/core_worker/core_worker.cc
Original file line number Diff line number Diff line change
Expand Up @@ -590,6 +590,8 @@ void CoreWorker::Disconnect(
const rpc::WorkerExitType &exit_type,
const std::string &exit_detail,
const std::shared_ptr<LocalMemoryBuffer> &creation_task_exception_pb_bytes) {
// Force stats export before exiting the worker.
opencensus::stats::StatsExporter::ExportNow();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this method thread safe?

Also, technically this can lose data because ExportNow() is not a blocking call (so if metrics_agent_io_service stops before we send a RPC, it can lose data). I guess the probably is low, and it might be okay.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I checked here and OCL acquires a mutex internally.

Regarding the RPC safety, I think at least the RPC initiation is a blocking call. I'm not sure if we wait for a reply though.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, I see. If the initiation is a blocking call, I think it is pretty safe although we are not waiting for a reply...

if (connected_) {
RAY_LOG(INFO) << "Disconnecting to the raylet.";
connected_ = false;
Expand Down
9 changes: 7 additions & 2 deletions thirdparty/patches/opencensus-cpp-shutdown-api.patch
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ diff --git opencensus/stats/internal/stats_exporter.cc opencensus/stats/internal
index 43ddbc7..37b4ae1 100644
--- opencensus/stats/internal/stats_exporter.cc
+++ opencensus/stats/internal/stats_exporter.cc
@@ -95,25 +95,52 @@ void StatsExporterImpl::ClearHandlersForTesting() {
@@ -95,25 +95,56 @@ void StatsExporterImpl::ClearHandlersForTesting() {
}

void StatsExporterImpl::StartExportThread() EXCLUSIVE_LOCKS_REQUIRED(mu_) {
Expand Down Expand Up @@ -138,6 +138,10 @@ index 43ddbc7..37b4ae1 100644
+ StatsExporterImpl::Get()->Shutdown();
+ StatsExporterImpl::Get()->ClearHandlersForTesting();
+}
+
+void StatsExporter::ExportNow() {
+ StatsExporterImpl::Get()->Export();
+}
+
// static
void StatsExporter::SetInterval(absl::Duration interval) {
Expand All @@ -158,11 +162,12 @@ diff --git opencensus/stats/stats_exporter.h opencensus/stats/stats_exporter.h
index 6756858..65e0262 100644
--- opencensus/stats/stats_exporter.h
+++ opencensus/stats/stats_exporter.h
@@ -45,6 +45,8 @@ class StatsExporter final {
@@ -45,6 +45,9 @@ class StatsExporter final {
// Removes the view with 'name' from the registry, if one is registered.
static void RemoveView(absl::string_view name);

+ static void Shutdown();
+ static void ExportNow();
+
// StatsExporter::Handler is the interface for push exporters that export
// recorded data for registered views. The exporter should provide a static
Expand Down