Skip to content

Commit

Permalink
[CoreWorker] Partially address Ray child process leaks by killing all…
Browse files Browse the repository at this point in the history
… child processes in the CoreWorker shutdown sequence. (ray-project#33976)

We kill all child processes when a Ray worker process exits. This addresses process leaks that caused GPU OOM errors in ray-project#31451. There is some risk to this PR, particularly if Ray users rely on Ray's existing behavior of leaking processes. We don't know of any such user, but we provide a new flag RAY_kill_child_processes_on_worker_exit to provide a workaround in case someone is impacted.

Signed-off-by: Jack He <[email protected]>
  • Loading branch information
cadedaniel authored and ProjectsByJackHe committed May 4, 2023
1 parent c67cd63 commit b965793
Show file tree
Hide file tree
Showing 7 changed files with 307 additions and 0 deletions.
103 changes: 103 additions & 0 deletions python/ray/tests/test_failure_3.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,20 @@
import sys
import signal
import threading
import json
from pathlib import Path

import ray
import numpy as np
import pytest
import psutil
import time

from ray._private.test_utils import (
SignalActor,
wait_for_pid_to_exit,
wait_for_condition,
run_string_as_driver_nonblocking,
)

SIGKILL = signal.SIGKILL if sys.platform != "win32" else signal.SIGTERM
Expand Down Expand Up @@ -347,6 +352,104 @@ def pid(self):
ray.get(t)


@pytest.mark.skipif(sys.platform != "linux", reason="Only works on linux.")
def test_no_worker_child_process_leaks(ray_start_cluster, tmp_path):
"""
Verify that processes created by Ray tasks and actors are
cleaned up after a Ctrl+C is sent to the driver. This is done by
creating an actor and task that each spawn a number of child
processes, sending a SIGINT to the driver process, and
verifying that all child processes are killed.
The driver script uses a temporary JSON file to communicate
the list of PIDs that are children of the Ray worker
processes.
"""

output_file_path = tmp_path / "leaked_pids.json"
driver_script = f"""
import ray
import json
import multiprocessing
import shutil
import time
import os
@ray.remote
class Actor:
def create_leaked_child_process(self, num_to_leak):
print("Creating leaked process", os.getpid())
pids = []
for _ in range(num_to_leak):
proc = multiprocessing.Process(
target=time.sleep,
args=(1000,),
daemon=True,
)
proc.start()
pids.append(proc.pid)
return pids
@ray.remote
def task():
print("Creating leaked process", os.getpid())
proc = multiprocessing.Process(
target=time.sleep,
args=(1000,),
daemon=True,
)
proc.start()
return proc.pid
num_to_leak_per_type = 10
actor = Actor.remote()
actor_leaked_pids = ray.get(actor.create_leaked_child_process.remote(
num_to_leak=num_to_leak_per_type,
))
task_leaked_pids = ray.get([task.remote() for _ in range(num_to_leak_per_type)])
leaked_pids = actor_leaked_pids + task_leaked_pids
final_file = "{output_file_path}"
tmp_file = final_file + ".tmp"
with open(tmp_file, "w") as f:
json.dump(leaked_pids, f)
shutil.move(tmp_file, final_file)
while True:
print(os.getpid())
time.sleep(1)
"""

driver_proc = run_string_as_driver_nonblocking(driver_script)

# Wait for the json file containing the child PIDS
# to be present.
wait_for_condition(
condition_predictor=lambda: Path(output_file_path).exists(),
timeout=30,
)

# Load the PIDs of the child processes.
with open(output_file_path, "r") as f:
pids = json.load(f)

# Validate all children of the worker processes are in a sleeping state.
processes = [psutil.Process(pid) for pid in pids]
assert all([proc.status() == psutil.STATUS_SLEEPING for proc in processes])

# Valdiate children of worker process die after SIGINT.
driver_proc.send_signal(signal.SIGINT)
wait_for_condition(
condition_predictor=lambda: all([not proc.is_running() for proc in processes]),
timeout=30,
)


if __name__ == "__main__":
import pytest

Expand Down
11 changes: 11 additions & 0 deletions src/ray/common/ray_config_def.h
Original file line number Diff line number Diff line change
Expand Up @@ -789,3 +789,14 @@ RAY_CONFIG(bool, kill_idle_workers_of_terminated_job, true)
// If left empty, no such attempt will be made.
// Example: RAY_preload_python_modules=tensorflow,pytorch
RAY_CONFIG(std::vector<std::string>, preload_python_modules, {})

// Instruct the CoreWorker to kill its child processes while
// it exits. This prevents certain classes of resource leaks
// that are caused by the worker processes leaking processes.
// If a user relies on Ray's old behavior of leaking processes,
// then they can disable this behavior with
// RAY_kill_child_processes_on_worker_exit=false. We anticipate
// keeping this flag around at least until Ray 2.5.
// See https://github.com/ray-project/ray/pull/33976 for more
// info.
RAY_CONFIG(bool, kill_child_processes_on_worker_exit, true)
54 changes: 54 additions & 0 deletions src/ray/core_worker/core_worker.cc
Original file line number Diff line number Diff line change
Expand Up @@ -704,6 +704,56 @@ void CoreWorker::Disconnect(
}
}

void CoreWorker::KillChildProcs() {
// There are cases where worker processes can "leak" child processes.
// Basically this means that the worker process (either itself, or via
// code in a task or actor) spawned a process and did not kill it on termination.
// The process will continue living beyond the lifetime of the worker process.
// If that leaked process has expensive resources, such as a CUDA context and associated
// GPU memory, then those resources will never be cleaned until something else kills the
// process.
//
// This function lists all processes that are direct children of the current worker
// process, then kills them. This currently only works for the "happy-path"; worker
// process crashes will still leak processes.
// TODO(cade) Use more robust method to catch leaked processes even in worker crash
// scenarios (subreaper).

if (!RayConfig::instance().kill_child_processes_on_worker_exit()) {
RAY_LOG(DEBUG)
<< "kill_child_processes_on_worker_exit is not true, skipping KillChildProcs";
return;
}

RAY_LOG(DEBUG) << "kill_child_processes_on_worker_exit true, KillChildProcs";
auto maybe_child_procs = GetAllProcsWithPpid(GetPID());

// Enumerating child procs is not supported on this platform.
if (!maybe_child_procs) {
RAY_LOG(DEBUG) << "Killing leaked procs not supported on this platform.";
return;
}

const auto &child_procs = *maybe_child_procs;
const auto child_procs_str = absl::StrJoin(child_procs, ",");
RAY_LOG(INFO) << "Try killing all child processes of this worker as it exits. "
<< "Child process pids: " << child_procs_str;

for (const auto &child_pid : child_procs) {
auto maybe_error_code = KillProc(child_pid);
RAY_CHECK(maybe_error_code)
<< "Expected this path to only be called when KillProc is supported.";
auto error_code = *maybe_error_code;

RAY_LOG(INFO) << "Kill result for child pid " << child_pid << ": "
<< error_code.message() << ", bool " << (bool)error_code;
if (error_code) {
RAY_LOG(WARNING) << "Unable to kill potentially leaked process " << child_pid
<< ": " << error_code.message();
}
}
}

void CoreWorker::Exit(
const rpc::WorkerExitType exit_type,
const std::string &detail,
Expand Down Expand Up @@ -734,6 +784,7 @@ void CoreWorker::Exit(
creation_task_exception_pb_bytes]() {
rpc::DrainAndResetServerCallExecutor();
Disconnect(exit_type, detail, creation_task_exception_pb_bytes);
KillChildProcs();
Shutdown();
},
"CoreWorker.Shutdown");
Expand Down Expand Up @@ -778,6 +829,9 @@ void CoreWorker::ForceExit(const rpc::WorkerExitType exit_type,
RAY_LOG(WARNING) << "Force exit the process. "
<< " Details: " << detail;
Disconnect(exit_type, detail);

KillChildProcs();

// NOTE(hchen): Use `QuickExit()` to force-exit this process without doing cleanup.
// `exit()` will destruct static objects in an incorrect order, which will lead to
// core dumps.
Expand Down
7 changes: 7 additions & 0 deletions src/ray/core_worker/core_worker.h
Original file line number Diff line number Diff line change
Expand Up @@ -1212,6 +1212,13 @@ class CoreWorker : public rpc::CoreWorkerServiceHandler {
/// \param exit_detail The detailed reason for a given exit.
void ForceExit(const rpc::WorkerExitType exit_type, const std::string &detail);

/// Forcefully kill child processes. User code running in actors or tasks
/// can spawn processes that don't get terminated. If those processes
/// own resources (such as GPU memory), then those resources will become
/// unavailable until the process is killed.
/// This is called during shutdown of the process.
void KillChildProcs();

/// Register this worker or driver to GCS.
void RegisterToGcs(int64_t worker_launch_time_ms, int64_t worker_launched_time_ms);

Expand Down
81 changes: 81 additions & 0 deletions src/ray/util/process.cc
Original file line number Diff line number Diff line change
Expand Up @@ -636,6 +636,87 @@ bool IsProcessAlive(pid_t pid) {
#endif
}

#if defined(__linux__)
static inline std::error_code KillProcLinux(pid_t pid) {
std::error_code error;
if (kill(pid, SIGKILL) != 0) {
error = std::error_code(errno, std::system_category());
}
return error;
}
#endif

std::optional<std::error_code> KillProc(pid_t pid) {
#if defined(__linux__)
return {KillProcLinux(pid)};
#else
return std::nullopt;
#endif
}

#if defined(__linux__)
static inline std::vector<pid_t> GetAllProcsWithPpidLinux(pid_t parent_pid) {
std::vector<pid_t> child_pids;

// Iterate over all files in the /proc directory, looking for directories.
// See `man proc` for information on the directory structure.
// Directories with only digits in their name correspond to processes in the process
// table. We read in the status of each such process and parse the parent PID. If the
// process parent PID is equal to parent_pid, then we add it to the vector to be
// returned. Ideally, we use a library for this, but at the time of writing one is not
// available in Ray C++.

std::filesystem::directory_iterator dir(kProcDirectory);
for (const auto &file : dir) {
if (!file.is_directory()) {
continue;
}

// Determine if the directory name consists of only digits (means it's a PID).
const auto filename = file.path().filename().string();
bool file_name_is_only_digit =
std::all_of(filename.begin(), filename.end(), ::isdigit);
if (!file_name_is_only_digit) {
continue;
}

// If so, open the status file for reading.
pid_t pid = std::stoi(filename);
std::ifstream status_file(file.path() / "status");
if (!status_file.is_open()) {
continue;
}

// Scan for the line that starts with the ppid key.
std::string line;
const std::string key = "PPid:";
while (std::getline(status_file, line)) {
const auto substr = line.substr(0, key.size());
if (substr != key) {
continue;
}

// We found it, read and parse the PPID.
pid_t ppid = std::stoi(line.substr(substr.size()));
if (ppid == parent_pid) {
child_pids.push_back(pid);
}
break;
}
}

return child_pids;
}
#endif

std::optional<std::vector<pid_t>> GetAllProcsWithPpid(pid_t parent_pid) {
#if defined(__linux__)
return {GetAllProcsWithPpidLinux(parent_pid)};
#else
return std::nullopt;
#endif
}

} // namespace ray

namespace std {
Expand Down
15 changes: 15 additions & 0 deletions src/ray/util/process.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
#include <functional>
#include <map>
#include <memory>
#include <optional>
#include <string>
#include <system_error>
#include <utility>
Expand Down Expand Up @@ -120,6 +121,20 @@ bool IsParentProcessAlive();

bool IsProcessAlive(pid_t pid);

static constexpr char kProcDirectory[] = "/proc";

// Platform-specific kill for the specified process identifier.
// Currently only supported on Linux. Returns nullopt for other platforms.
std::optional<std::error_code> KillProc(pid_t pid);

// Platform-specific utility to find the process IDs of all processes
// that have the specified parent_pid as their parent.
// In other words, find all immediate children of the specified process
// id.
//
// Currently only supported on Linux. Returns nullopt on other platforms.
std::optional<std::vector<pid_t>> GetAllProcsWithPpid(pid_t parent_pid);

} // namespace ray

// We only define operators required by the standard library (==, hash):
Expand Down
36 changes: 36 additions & 0 deletions src/ray/util/util_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
#include <chrono>
#include <thread>

#include "gmock/gmock.h"
#include "gtest/gtest.h"
#include "ray/util/logging.h"
#include "ray/util/process.h"
Expand Down Expand Up @@ -205,6 +206,41 @@ TEST(UtilTest, IsProcessAlive) {
RAY_CHECK(!IsProcessAlive(pid));
}

TEST(UtilTest, GetAllProcsWithPpid) {
#if defined(__linux__)
// Verify correctness by spawning several child processes,
// then asserting that each PID is present in the output.

namespace bp = boost::process;

std::vector<bp::child> actual_child_procs;

for (int i = 0; i < 10; ++i) {
actual_child_procs.push_back(bp::child("bash"));
}

std::optional<std::vector<pid_t>> maybe_child_procs = GetAllProcsWithPpid(GetPID());

// Assert optional has value.
ASSERT_EQ(static_cast<bool>(maybe_child_procs), true);

// Assert each actual process ID is contained in the returned vector.
auto child_procs = *maybe_child_procs;
for (auto &child_proc : actual_child_procs) {
pid_t pid = child_proc.id();
EXPECT_THAT(child_procs, ::testing::Contains(pid));
}

// Clean up each child proc.
for (auto &child_proc : actual_child_procs) {
child_proc.join();
}
#else
auto result = GetAllProcsWithPpid(1);
ASSERT_EQ(result, std::nullopt);
#endif
}

} // namespace ray

int main(int argc, char **argv) {
Expand Down

0 comments on commit b965793

Please sign in to comment.