From 52b2a2d9759372e747128cd2a141d94d6e1d00ce Mon Sep 17 00:00:00 2001 From: clarng Date: Sat, 19 Nov 2022 19:50:06 -0800 Subject: [PATCH] [core] print top-n memory usage when task OOMs, update docs (#30312) Signed-off-by: Clarence Ng clarence.wyng@gmail.com Why are these changes needed? Prints memory usage of the process killed + top n processes on the node Some logging cleanup to remove span that would otherwise show up in doc examples, update docs to reflect latest content Signed-off-by: Weichen Xu --- BUILD.bazel | 1 + doc/source/ray-contribute/stability.rst | 2 + .../scheduling/ray-oom-prevention.rst | 58 ++++--- src/ray/common/memory_monitor.cc | 124 +++++++++++++-- src/ray/common/memory_monitor.h | 71 ++++++++- src/ray/common/test/memory_monitor_test.cc | 147 ++++++++++++++++++ src/ray/raylet/node_manager.cc | 38 +++-- src/ray/raylet/worker_killing_policy.cc | 15 +- src/ray/raylet/worker_killing_policy.h | 8 +- src/ray/raylet/worker_killing_policy_test.cc | 12 +- 10 files changed, 412 insertions(+), 64 deletions(-) diff --git a/BUILD.bazel b/BUILD.bazel index 9fcff7d1b474..78b66f49462a 100644 --- a/BUILD.bazel +++ b/BUILD.bazel @@ -1610,6 +1610,7 @@ cc_test( ], deps = [ ":ray_common", + "@boost//:filesystem", "@com_google_googletest//:gtest_main", ], ) diff --git a/doc/source/ray-contribute/stability.rst b/doc/source/ray-contribute/stability.rst index c8d5910ff75e..136ac4aa44d6 100644 --- a/doc/source/ray-contribute/stability.rst +++ b/doc/source/ray-contribute/stability.rst @@ -26,6 +26,8 @@ of them individually. Breaking changes **must** be both allowed and expected in alpha components, and users **must** have no expectation of stability. +.. _api-stability-beta: + Beta ~~~~ diff --git a/doc/source/ray-core/scheduling/ray-oom-prevention.rst b/doc/source/ray-core/scheduling/ray-oom-prevention.rst index 204aac503c25..41d67a577bc0 100644 --- a/doc/source/ray-core/scheduling/ray-oom-prevention.rst +++ b/doc/source/ray-core/scheduling/ray-oom-prevention.rst @@ -18,20 +18,20 @@ The memory monitor is a component that runs within the :ref:`raylet .. note:: - The memory monitor is in :ref:`alpha `. It is disabled by default and needs to be enabled by setting the environment variable ``RAY_memory_monitor_interval_ms`` to a value greater than zero when Ray starts. It is available on Linux and is tested with Ray running inside a container that is using cgroup v1. If you encounter issues when running the memory monitor outside of a container or the container is using cgroup v2, please :ref:`file an issue or post a question `. + The memory monitor is in :ref:`beta `. It is enabled by default and can be disabled by setting the environment variable ``RAY_memory_monitor_interval_ms`` to zero when Ray starts. It is available on Linux and is tested with Ray running inside a container that is using cgroup v1. If you encounter issues when running the memory monitor outside of a container or the container is using cgroup v2, please :ref:`file an issue or post a question `. How do I configure the memory monitor? -------------------------------------- The memory monitor is controlled by the following environment variables: -- ``RAY_memory_monitor_interval_ms (int, defaults to 9)`` is the interval to check memory usage and kill tasks or actors if needed. It is disabled when this value is 0. +- ``RAY_memory_monitor_interval_ms (int, defaults to 250)`` is the interval to check memory usage and kill tasks or actors if needed. It is disabled when this value is 0. -- ``RAY_memory_usage_threshold_fraction (float, defaults to 0.9)`` is the threshold when the node is beyond the memory +- ``RAY_memory_usage_threshold_fraction (float, defaults to 0.98)`` is the threshold when the node is beyond the memory capacity. If the memory usage is above this value and the free space is below min_memory_free_bytes then it will start killing processes to free up space. Ranges from [0, 1]. -- ``RAY_min_memory_free_bytes (int, defaults to 1 GiB)`` is the minimum amount of free space. If the memory usage is above +- ``RAY_min_memory_free_bytes (int, defaults to 512 MiB)`` is the minimum amount of free space. If the memory usage is above ``memory_usage_threshold_fraction`` and the free space is below this value then it will start killing processes to free up space. This setting is unused if it is set to -1. @@ -78,8 +78,8 @@ Memory usage threshold The memory usage threshold is used by the memory monitor to determine when it should start killing processes to free up memory. The threshold is controlled by the two environment variables: -- ``RAY_memory_usage_threshold_fraction`` (default: 0.9) -- ``RAY_min_memory_free_bytes`` (default: 1 GiB) +- ``RAY_memory_usage_threshold_fraction`` (default: 0.98) +- ``RAY_min_memory_free_bytes`` (default: 512 MiB) When the node starts it computes the usage threshold as follows: @@ -136,33 +136,43 @@ Let's create an application oom.py that will trigger the out-of-memory condition :end-before: __oom_end__ -To speed up the example, set ``RAY_task_oom_retries=1`` on the application so the task will only retry once if it is killed by the memory monitor. +To speed up the example, set ``RAY_task_oom_retries=1`` on the application so the task will only retry once if it is killed by the memory monitor. Also set ``RAY_event_stats_print_interval_ms=1000`` so it prints the worker kill summary, which by default is every minute. .. code-block:: bash - $ RAY_task_oom_retries=1 python oom.py + RAY_event_stats_print_interval_ms=1000 RAY_task_oom_retries=1 python oom.py - INFO worker.py:1342 -- Connecting to existing Ray cluster at address: 172.17.0.2:6379... - INFO worker.py:1525 -- Connected to Ray cluster. View the dashboard at http://127.0.0.1:8265 + 2022-11-17 09:16:40,792 INFO worker.py:1534 -- Started a local Ray instance. View the dashboard at http://127.0.0.1:8265 - WARNING worker.py:1839 -- A worker died or was killed while executing a task by an unexpected system error. To troubleshoot the problem, check the logs for the dead worker. RayTask ID: 8ce7275b7a7953cc794f8c138a616d91cb907c1b01000000 Worker ID: 5c9ac30f8a9eda340f651a204de5d94f1ff965c5d9f72175579bd8dd Node ID: 3a4b60759256926fd0e84a9ff596dab3d7be854134107ef21b0e0260 Worker IP address: 172.17.0.2 Worker port: 10003 Worker PID: 69161 Worker exit type: SYSTEM_ERROR Worker exit detail: Task was killed due to the node running low on memory. - Memory on the node (IP: 172.17.0.2, ID: 3a4b60759256926fd0e84a9ff596dab3d7be854134107ef21b0e0260) where the task was running was 32.91GB / 33.28GB (0.988698), which exceeds the memory usage threshold of 0.969955. Ray killed this worker (ID: 5c9ac30f8a9eda340f651a204de5d94f1ff965c5d9f72175579bd8dd) because it was the most recently scheduled task; to see more information about memory usage on this node, use `ray logs raylet.out -ip 172.17.0.2`. To see the logs of the worker, use `ray logs worker-5c9ac30f8a9eda340f651a204de5d94f1ff965c5d9f72175579bd8dd*out -ip 172.17.0.2`. - Consider provisioning more memory on this node or reducing task parallelism by requesting more CPUs per task. To adjust the eviction threshold, set the environment variable `RAY_memory_usage_threshold_fraction` when starting Ray. To disable worker eviction, set the environment variable `RAY_memory_monitor_interval_ms` to zero. + (raylet) [2022-11-17 09:16:52,264 E 90996 90996] (raylet) node_manager.cc:3096: 1 Workers (tasks / actors) killed due to memory pressure (OOM), 0 Workers crashed due to other reasons at node (ID: 90efe99b630d4b1f6ac1504df64764732d555b526049638f9d86552f, IP: 172.17.0.2) over the last time period. To see more information about the Workers killed on this node, use `ray logs raylet.out -ip 172.17.0.2` + (raylet) + (raylet) Refer to the documentation on how to address the out of memory issue: https://docs.ray.io/en/latest/ray-core/scheduling/ray-oom-prevention.html. Consider provisioning more memory on this node or reducing task parallelism by requesting more CPUs per task. To adjust the kill threshold, set the environment variable `RAY_memory_usage_threshold_fraction` when starting Ray. To disable worker killing, set the environment variable `RAY_memory_monitor_interval_ms` to zero. - WARNING worker.py:1839 -- A worker died or was killed while executing a task by an unexpected system error. To troubleshoot the problem, check the logs for the dead worker. RayTask ID: b60ff970726d7cf526e74acc71310ecce51edb4c01000000 Worker ID: 39416ad98016ee6a63173856a9b4e4100625be22e6ee4192722388ba Node ID: 3a4b60759256926fd0e84a9ff596dab3d7be854134107ef21b0e0260 Worker IP address: 172.17.0.2 Worker port: 10004 Worker PID: 69160 Worker exit type: SYSTEM_ERROR Worker exit detail: Task was killed due to the node running low on memory. - Memory on the node (IP: 172.17.0.2, ID: 3a4b60759256926fd0e84a9ff596dab3d7be854134107ef21b0e0260) where the task was running was 32.53GB / 33.28GB (0.977449), which exceeds the memory usage threshold of 0.969955. Ray killed this worker (ID: 39416ad98016ee6a63173856a9b4e4100625be22e6ee4192722388ba) because it was the most recently scheduled task; to see more information about memory usage on this node, use `ray logs raylet.out -ip 172.17.0.2`. To see the logs of the worker, use `ray logs worker-39416ad98016ee6a63173856a9b4e4100625be22e6ee4192722388ba*out -ip 172.17.0.2`. - Consider provisioning more memory on this node or reducing task parallelism by requesting more CPUs per task. To adjust the eviction threshold, set the environment variable `RAY_memory_usage_threshold_fraction` when starting Ray. To disable worker eviction, set the environment variable `RAY_memory_monitor_interval_ms` to zero. + (raylet) [2022-11-17 09:17:03,461 E 90996 90996] (raylet) node_manager.cc:3096: 1 Workers (tasks / actors) killed due to memory pressure (OOM), 0 Workers crashed due to other reasons at node (ID: 90efe99b630d4b1f6ac1504df64764732d555b526049638f9d86552f, IP: 172.17.0.2) over the last time period. To see more information about the Workers killed on this node, use `ray logs raylet.out -ip 172.17.0.2` + (raylet) + (raylet) Refer to the documentation on how to address the out of memory issue: https://docs.ray.io/en/latest/ray-core/scheduling/ray-oom-prevention.html. Consider provisioning more memory on this node or reducing task parallelism by requesting more CPUs per task. To adjust the kill threshold, set the environment variable `RAY_memory_usage_threshold_fraction` when starting Ray. To disable worker killing, set the environment variable `RAY_memory_monitor_interval_ms` to zero. Traceback (most recent call last): - File "simple.py", line 11, in - ray.get(tasks) + File "oom.py", line 11, in + ray.get(allocate_memory.remote()) File "/home/ray/github/rayclarng/ray/python/ray/_private/client_mode_hook.py", line 105, in wrapper return func(*args, **kwargs) - File "/home/ray/github/rayclarng/ray/python/ray/_private/worker.py", line 2291, in get + File "/home/ray/github/rayclarng/ray/python/ray/_private/worker.py", line 2310, in get raise value ray.exceptions.OutOfMemoryError: Task was killed due to the node running low on memory. - Memory on the node (IP: 172.17.0.2, ID: 3a4b60759256926fd0e84a9ff596dab3d7be854134107ef21b0e0260) where the task was running was 32.53GB / 33.28GB (0.977449), which exceeds the memory usage threshold of 0.969955. Ray killed this worker (ID: 39416ad98016ee6a63173856a9b4e4100625be22e6ee4192722388ba) because it was the most recently scheduled task; to see more information about memory usage on this node, use `ray logs raylet.out -ip 172.17.0.2`. To see the logs of the worker, use `ray logs worker-39416ad98016ee6a63173856a9b4e4100625be22e6ee4192722388ba*out -ip 172.17.0.2`. - Consider provisioning more memory on this node or reducing task parallelism by requesting more CPUs per task. To adjust the eviction threshold, set the environment variable `RAY_memory_usage_threshold_fraction` when starting Ray. To disable worker eviction, set the environment variable `RAY_memory_monitor_interval_ms` to zero. + Memory on the node (IP: 172.17.0.2, ID: 90efe99b630d4b1f6ac1504df64764732d555b526049638f9d86552f) where the task (task ID: a6755d1708846b10007fda8a687eb57eb8a083c001000000, name=allocate_memory, pid=91085, memory used=24.99GB) was running was 32.62GB / 33.28GB (0.980175), which exceeds the memory usage threshold of 0.96. Ray killed this worker (ID: a8101629b7605f88776a08193f108adcc637248d976add819bbecbba) because it was the most recently scheduled task; to see more information about memory usage on this node, use `ray logs raylet.out -ip 172.17.0.2`. To see the logs of the worker, use `ray logs worker-a8101629b7605f88776a08193f108adcc637248d976add819bbecbba*out -ip 172.17.0.2.Top 10 memory users: + PID MEM(GB) COMMAND + 91085 24.99 ray::allocate_memory + 57330 2.63 /home/ray/.vscode-server/extensions/ms-vscode.cpptools-1.12.4-linux-x64/bin/cpptools + 48949 1.70 /home/ray/.vscode-server/bin/d045a5eda657f4d7b676dedbfa7aab8207f8a075/node /home/ray/.vscode-server/... + 54387 0.80 bazel(ray) -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=/home/ray/.cache/bazel/_bazel_ray/8c472b... + 35099 0.66 /home/ray/.vscode-server/extensions/ms-vscode.cpptools-1.12.4-linux-x64/bin/cpptools-srv 57330 {1729... + 16821 0.23 /home/ray/.vscode-server/bin/d045a5eda657f4d7b676dedbfa7aab8207f8a075/node /home/ray/.vscode-server/... + 61800 0.17 /home/ray/.vscode-server/extensions/ms-vscode.cpptools-1.12.4-linux-x64/bin/cpptools-srv 57330 {54EF... + 91043 0.07 /home/ray/anaconda3/bin/python -u /home/ray/github/rayclarng/ray/python/ray/dashboard/agent.py --nod... + 90935 0.07 /home/ray/anaconda3/bin/python /home/ray/github/rayclarng/ray/python/ray/dashboard/dashboard.py --ho... + 90870 0.07 python oom.py + Refer to the documentation on how to address the out of memory issue: https://docs.ray.io/en/latest/ray-core/scheduling/ray-oom-prevention.html. Consider provisioning more memory on this node or reducing task parallelism by requesting more CPUs per task. To adjust the kill threshold, set the environment variable `RAY_memory_usage_threshold_fraction` when starting Ray. To disable worker killing, set the environment variable `RAY_memory_monitor_interval_ms` to zero. Verify the task was indeed executed twice via ``task_oom_retry``: @@ -170,11 +180,11 @@ Verify the task was indeed executed twice via ``task_oom_retry``: $ grep -r "retries left" /tmp/ray/session_latest/logs/ - /tmp/ray/session_latest/logs/python-core-driver-01000000ffffffffffffffffffffffffffffffffffffffffffffffff_60002.log:[2022-10-12 16:14:07,723 I 60002 60031] task_manager.cc:458: task c8ef45ccd0112571ffffffffffffffffffffffff01000000 retries left: 3, oom retries left: 1, task failed due to oom: 1 + /tmp/ray/session_latest/logs/python-core-driver-01000000ffffffffffffffffffffffffffffffffffffffffffffffff_87487.log:[2022-11-15 13:50:27,653 I 87487 87703] task_manager.cc:458: task c8ef45ccd0112571ffffffffffffffffffffffff01000000 retries left: 3, oom retries left: 1, task failed due to oom: 1 - /tmp/ray/session_latest/logs/python-core-driver-01000000ffffffffffffffffffffffffffffffffffffffffffffffff_60002.log:[2022-10-12 16:14:18,843 I 60002 60031] task_manager.cc:458: task c8ef45ccd0112571ffffffffffffffffffffffff01000000 retries left: 3, oom retries left: 0, task failed due to oom: 1 + /tmp/ray/session_latest/logs/python-core-driver-01000000ffffffffffffffffffffffffffffffffffffffffffffffff_87487.log:[2022-11-15 13:50:36,671 I 87487 87703] task_manager.cc:458: task c8ef45ccd0112571ffffffffffffffffffffffff01000000 retries left: 3, oom retries left: 0, task failed due to oom: 1 - /tmp/ray/session_latest/logs/python-core-driver-01000000ffffffffffffffffffffffffffffffffffffffffffffffff_60002.log:[2022-10-12 16:14:18,843 I 60002 60031] task_manager.cc:466: No retries left for task c8ef45ccd0112571ffffffffffffffffffffffff01000000, not going to resubmit. + /tmp/ray/session_latest/logs/python-core-driver-01000000ffffffffffffffffffffffffffffffffffffffffffffffff_87487.log:[2022-11-15 13:50:36,671 I 87487 87703] task_manager.cc:466: No retries left for task c8ef45ccd0112571ffffffffffffffffffffffff01000000, not going to resubmit. .. note:: diff --git a/src/ray/common/memory_monitor.cc b/src/ray/common/memory_monitor.cc index c3eb06a91a71..da40c73401de 100644 --- a/src/ray/common/memory_monitor.cc +++ b/src/ray/common/memory_monitor.cc @@ -14,12 +14,14 @@ #include "ray/common/memory_monitor.h" +#include #include #include // std::ifstream #include #include "ray/common/ray_config.h" #include "ray/util/logging.h" +#include "ray/util/process.h" #include "ray/util/util.h" namespace ray { @@ -302,17 +304,17 @@ std::tuple MemoryMonitor::GetLinuxMemoryBytes() { return {used_bytes, mem_total_bytes}; } -int64_t MemoryMonitor::GetProcessMemoryBytes(int64_t process_id) const { - std::stringstream smap_path; - smap_path << "/proc/" << std::to_string(process_id) << "/smaps_rollup"; - return GetLinuxProcessMemoryBytesFromSmap(smap_path.str()); +int64_t MemoryMonitor::GetProcessMemoryBytes(pid_t pid, const std::string proc_dir) { + std::stringstream smaps_path; + smaps_path << proc_dir << "/" << std::to_string(pid) << "/smaps_rollup"; + return GetLinuxProcessMemoryBytesFromSmap(smaps_path.str()); } /// TODO:(clarng) align logic with psutil / Python-side memory calculations int64_t MemoryMonitor::GetLinuxProcessMemoryBytesFromSmap(const std::string smap_path) { std::ifstream smap_ifs(smap_path, std::ios::in | std::ios::binary); if (!smap_ifs.is_open()) { - RAY_LOG_EVERY_MS(ERROR, kLogIntervalMs) << " file not found: " << smap_path; + RAY_LOG_EVERY_MS(WARNING, kLogIntervalMs) << " file not found: " << smap_path; return kNull; } @@ -331,15 +333,14 @@ int64_t MemoryMonitor::GetLinuxProcessMemoryBytesFromSmap(const std::string smap /// Linux reports them as kiB RAY_CHECK(unit == "kB"); - value = value * 1024; - if (title == "Private_Clean:" || title == "Private_Dirty:" || - title == "Private_Hugetlb:") { - uss += value; + /// Captures Private_Clean, Private_Dirty, Private_Hugetlb + if (boost::starts_with(title, "Private_")) { + uss += value * 1024; } } if (uss == 0) { - RAY_LOG_EVERY_MS(ERROR, kLogIntervalMs) + RAY_LOG_EVERY_MS(WARNING, kLogIntervalMs) << "Got zero used memory for smap file " << smap_path; return kNull; } @@ -378,6 +379,109 @@ int64_t MemoryMonitor::GetMemoryThreshold(int64_t total_memory_bytes, } } +const std::vector MemoryMonitor::GetPidsFromDir(const std::string proc_dir) { + std::vector pids; + if (!std::filesystem::exists(proc_dir)) { + RAY_LOG_EVERY_MS(INFO, kLogIntervalMs) + << "Proc dir doesn't exist, return no pids. Dir: " << proc_dir; + return pids; + } + for (const auto &file : std::filesystem::directory_iterator(proc_dir)) { + std::string filename{file.path().filename().u8string()}; + if (std::all_of(filename.begin(), filename.end(), ::isdigit)) { + pids.push_back(static_cast(std::stoi(filename))); + } + } + return pids; +} + +const std::string MemoryMonitor::GetCommandLineForPid(pid_t pid, + const std::string proc_dir) { + std::string path = + proc_dir + "/" + std::to_string(pid) + "/" + MemoryMonitor::kCommandlinePath; + std::ifstream commandline_ifs(path, std::ios::in | std::ios::binary); + if (!commandline_ifs.is_open()) { + RAY_LOG_EVERY_MS(INFO, kLogIntervalMs) + << "Command line path doesn't exist, returning empty command. Path: " << path; + return {}; + } + + std::string line; + while (std::getline(commandline_ifs, line)) { + std::replace(line.begin(), line.end(), '\0', ' '); + boost::trim(line); + return line; + } + RAY_LOG_EVERY_MS(INFO, kLogIntervalMs) + << "Empty file. Returning empty command. Path: " << path; + return {}; +} + +const std::string MemoryMonitor::TopNMemoryDebugString(uint32_t top_n, + const MemorySnapshot system_memory, + const std::string proc_dir) { + auto pid_to_memory_usage = + MemoryMonitor::GetTopNMemoryUsage(top_n, system_memory.process_used_bytes); + + std::string debug_string = "PID\tMEM(GB)\tCOMMAND"; + for (std::tuple entry : pid_to_memory_usage) { + auto [pid, memory_used_bytes] = entry; + auto pid_string = std::to_string(pid); + auto memory_usage_gb = + FormatFloat(static_cast(memory_used_bytes) / 1024 / 1024 / 1024, 2); + auto commandline = MemoryMonitor::TruncateString( + MemoryMonitor::GetCommandLineForPid(pid, proc_dir), 100); + debug_string += "\n" + pid_string + "\t" + memory_usage_gb + "\t" + commandline; + } + + return debug_string; +} + +const std::vector> MemoryMonitor::GetTopNMemoryUsage( + uint32_t top_n, const absl::flat_hash_map all_usage) { + std::vector> pid_to_memory_usage; + for (auto entry : all_usage) { + pid_to_memory_usage.push_back({entry.first, entry.second}); + } + + std::sort(pid_to_memory_usage.begin(), + pid_to_memory_usage.end(), + [](std::tuple const &left, + std::tuple const &right) -> bool { + auto [pid_left, memory_used_bytes_left] = left; + auto [pid_right, memory_used_bytes_right] = right; + return memory_used_bytes_left > memory_used_bytes_right; + }); + + if (pid_to_memory_usage.size() > top_n) { + pid_to_memory_usage.resize(top_n); + } + + return pid_to_memory_usage; +} + +const absl::flat_hash_map MemoryMonitor::GetProcessMemoryUsage( + const std::string proc_dir) { + std::vector pids = MemoryMonitor::GetPidsFromDir(proc_dir); + absl::flat_hash_map pid_to_memory_usage; + + for (int32_t pid : pids) { + int64_t memory_used_bytes = MemoryMonitor::GetProcessMemoryBytes(pid, proc_dir); + if (memory_used_bytes != kNull) { + pid_to_memory_usage.insert({pid, memory_used_bytes}); + } + } + return pid_to_memory_usage; +} + +const std::string MemoryMonitor::TruncateString(const std::string value, + uint32_t max_length) { + if (value.length() > max_length) { + return value.substr(0, max_length) + "..."; + } + return value; +} + std::ostream &operator<<(std::ostream &os, const MemorySnapshot &memory_snapshot) { os << "Used bytes: " << memory_snapshot.used_bytes << ", Total bytes: " << memory_snapshot.total_bytes; diff --git a/src/ray/common/memory_monitor.h b/src/ray/common/memory_monitor.h index df369d47594a..56f0af37571c 100644 --- a/src/ray/common/memory_monitor.h +++ b/src/ray/common/memory_monitor.h @@ -18,6 +18,7 @@ #include "ray/common/asio/instrumented_io_context.h" #include "ray/common/asio/periodical_runner.h" +#include "ray/util/process.h" namespace ray { @@ -29,6 +30,9 @@ struct MemorySnapshot { /// The total memory that can be used. >= used_bytes; int64_t total_bytes; + /// The per-process memory used; + absl::flat_hash_map process_used_bytes; + friend std::ostream &operator<<(std::ostream &os, const MemorySnapshot &memory_snapshot); }; @@ -64,9 +68,22 @@ class MemoryMonitor { MemoryUsageRefreshCallback monitor_callback); public: - /// \param process_id the process id - /// \return the used memory in bytes for the process - int64_t GetProcessMemoryBytes(int64_t process_id) const; + /// \param top_n the number of top memory-using processes + /// \param system_memory the snapshot of memory usage + /// \param proc_dir the directory to scan for the processes + /// + /// \return the debug string that contains up to the top N memory-using processes, + /// empty if process directory is invalid + static const std::string TopNMemoryDebugString( + uint32_t top_n, + const MemorySnapshot system_memory, + const std::string proc_dir = kProcDirectory); + + /// \param proc_dir the directory to scan for the processes + /// + /// \return the pid to memory usage map for all the processes + static const absl::flat_hash_map GetProcessMemoryUsage( + const std::string proc_dir = kProcDirectory); private: static constexpr char kCgroupsV1MemoryMaxPath[] = @@ -78,6 +95,8 @@ class MemoryMonitor { static constexpr char kCgroupsV2MemoryUsagePath[] = "/sys/fs/cgroup/memory.current"; static constexpr char kCgroupsV2MemoryStatPath[] = "/sys/fs/cgroup/memory.stat"; static constexpr char kCgroupsV2MemoryStatInactiveKey[] = "inactive_file"; + static constexpr char kProcDirectory[] = "/proc"; + static constexpr char kCommandlinePath[] = "cmdline"; /// The logging frequency. Decoupled from how often the monitor runs. static constexpr uint32_t kLogIntervalMs = 5000; static constexpr int64_t kNull = -1; @@ -115,6 +134,29 @@ class MemoryMonitor { /// not exist or if it fails to read a valid value. static int64_t GetLinuxProcessMemoryBytesFromSmap(const std::string smap_path); + /// \param proc_dir directory to scan for the process ids + /// + /// \return list of process ids found in the directory, + /// or empty list if the directory doesn't exist + static const std::vector GetPidsFromDir( + const std::string proc_dir = kProcDirectory); + + /// \param pid the process id + /// \param proc_dir directory to scan for the process ids + /// + /// \return the command line for the executing process, + /// or empty string if the processs doesn't exist + static const std::string GetCommandLineForPid( + pid_t pid, const std::string proc_dir = kProcDirectory); + + /// Truncates string if it is too long and append '...' + /// + /// \param value the string to truncate + /// \param max_length the max length of the string value to preserve + /// + /// \return the debug string that contains the top N memory using process + static const std::string TruncateString(const std::string value, uint32_t max_length); + /// \return the smaller of the two integers, kNull if both are kNull, /// or one of the values if the other is kNull. static int64_t NullableMin(int64_t left, int64_t right); @@ -133,6 +175,21 @@ class MemoryMonitor { float usage_threshold, int64_t min_memory_free_bytes); + /// \param pid the process id + /// \param proc_dir the process directory + /// + /// \return the used memory in bytes for the process, + /// kNull if the file doesn't exist or it fails to find the fields + static int64_t GetProcessMemoryBytes(pid_t pid, + const std::string proc_dir = kProcDirectory); + + /// \param top_n the number of top memory-using processes + /// \param all_usage process to memory usage map + /// + /// \return the top N memory-using processes + static const std::vector> GetTopNMemoryUsage( + uint32_t top_n, const absl::flat_hash_map all_usage); + private: FRIEND_TEST(MemoryMonitorTest, TestThresholdZeroMonitorAlwaysAboveThreshold); FRIEND_TEST(MemoryMonitorTest, TestThresholdOneMonitorAlwaysBelowThreshold); @@ -151,6 +208,14 @@ class MemoryMonitor { FRIEND_TEST(MemoryMonitorTest, TestMonitorPeriodSetMaxUsageThresholdCallbackExecuted); FRIEND_TEST(MemoryMonitorTest, TestMonitorPeriodDisableMinMemoryCallbackExecuted); FRIEND_TEST(MemoryMonitorTest, TestGetMemoryThresholdTakeGreaterOfTheTwoValues); + FRIEND_TEST(MemoryMonitorTest, TestGetPidsFromDirOnlyReturnsNumericFilenames); + FRIEND_TEST(MemoryMonitorTest, TestGetPidsFromNonExistentDirReturnsEmpty); + FRIEND_TEST(MemoryMonitorTest, TestGetCommandLinePidExistReturnsValid); + FRIEND_TEST(MemoryMonitorTest, TestGetCommandLineMissingFileReturnsEmpty); + FRIEND_TEST(MemoryMonitorTest, TestShortStringNotTruncated); + FRIEND_TEST(MemoryMonitorTest, TestLongStringTruncated); + FRIEND_TEST(MemoryMonitorTest, TestTopNLessThanNReturnsMemoryUsedDesc); + FRIEND_TEST(MemoryMonitorTest, TestTopNMoreThanNReturnsAllDesc); /// Memory usage fraction between [0, 1] const float usage_threshold_; diff --git a/src/ray/common/test/memory_monitor_test.cc b/src/ray/common/test/memory_monitor_test.cc index 979204613d4f..135220562164 100644 --- a/src/ray/common/test/memory_monitor_test.cc +++ b/src/ray/common/test/memory_monitor_test.cc @@ -16,6 +16,8 @@ #include +#include +#include #include #include "gtest/gtest.h" @@ -24,6 +26,7 @@ #include "ray/util/process.h" namespace ray { + class MemoryMonitorTest : public ::testing::Test { protected: void SetUp() override { @@ -38,6 +41,21 @@ class MemoryMonitorTest : public ::testing::Test { } std::unique_ptr thread_; instrumented_io_context io_context_; + + void MakeMemoryUsage(pid_t pid, + const std::string usage_kb, + const std::string proc_dir) { + boost::filesystem::create_directory(proc_dir); + boost::filesystem::create_directory(proc_dir + "/" + std::to_string(pid)); + + std::string usage_filename = proc_dir + "/" + std::to_string(pid) + "/smaps_rollup"; + + std::ofstream usage_file; + usage_file.open(usage_filename); + usage_file << "SomeHeader" << std::endl; + usage_file << "Private_Clean: " << usage_kb << " kB" << std::endl; + usage_file.close(); + } }; TEST_F(MemoryMonitorTest, TestThresholdZeroMonitorAlwaysAboveThreshold) { @@ -367,6 +385,135 @@ TEST_F(MemoryMonitorTest, TestGetMemoryThresholdTakeGreaterOfTheTwoValues) { ASSERT_EQ(MemoryMonitor::GetMemoryThreshold(100, 1, MemoryMonitor::kNull), 100); } +TEST_F(MemoryMonitorTest, TestGetPidsFromDirOnlyReturnsNumericFilenames) { + std::string proc_dir = UniqueID::FromRandom().Hex(); + boost::filesystem::create_directory(proc_dir); + + std::string num_filename = proc_dir + "/123"; + std::string non_num_filename = proc_dir + "/123b"; + + std::ofstream num_file; + num_file.open(num_filename); + num_file << num_filename; + num_file.close(); + + std::ofstream non_num_file; + non_num_file.open(non_num_filename); + non_num_file << non_num_filename; + non_num_file.close(); + + auto pids = MemoryMonitor::GetPidsFromDir(proc_dir); + + boost::filesystem::remove_all(proc_dir); + + ASSERT_EQ(pids.size(), 1); + ASSERT_EQ(pids[0], 123); +} + +TEST_F(MemoryMonitorTest, TestGetPidsFromNonExistentDirReturnsEmpty) { + std::string proc_dir = UniqueID::FromRandom().Hex(); + auto pids = MemoryMonitor::GetPidsFromDir(proc_dir); + ASSERT_EQ(pids.size(), 0); +} + +TEST_F(MemoryMonitorTest, TestGetCommandLinePidExistReturnsValid) { + std::string proc_dir = UniqueID::FromRandom().Hex(); + std::string pid_dir = proc_dir + "/123"; + boost::filesystem::create_directories(pid_dir); + + std::string cmdline_filename = pid_dir + "/" + MemoryMonitor::kCommandlinePath; + + std::ofstream cmdline_file; + cmdline_file.open(cmdline_filename); + cmdline_file << "/my/very/custom/command --test passes! "; + cmdline_file.close(); + + std::string commandline = MemoryMonitor::GetCommandLineForPid(123, proc_dir); + + boost::filesystem::remove_all(proc_dir); + + ASSERT_EQ(commandline, "/my/very/custom/command --test passes!"); +} + +TEST_F(MemoryMonitorTest, TestGetCommandLineMissingFileReturnsEmpty) { + { + std::string proc_dir = UniqueID::FromRandom().Hex(); + std::string commandline = MemoryMonitor::GetCommandLineForPid(123, proc_dir); + boost::filesystem::remove_all(proc_dir); + ASSERT_EQ(commandline, ""); + } + + { + std::string proc_dir = UniqueID::FromRandom().Hex(); + boost::filesystem::create_directory(proc_dir); + std::string commandline = MemoryMonitor::GetCommandLineForPid(123, proc_dir); + boost::filesystem::remove_all(proc_dir); + ASSERT_EQ(commandline, ""); + } + + { + std::string proc_dir = UniqueID::FromRandom().Hex(); + std::string pid_dir = proc_dir + "/123"; + boost::filesystem::create_directories(pid_dir); + std::string commandline = MemoryMonitor::GetCommandLineForPid(123, proc_dir); + boost::filesystem::remove_all(proc_dir); + ASSERT_EQ(commandline, ""); + } +} + +TEST_F(MemoryMonitorTest, TestShortStringNotTruncated) { + std::string out = MemoryMonitor::TruncateString("im short", 20); + ASSERT_EQ(out, "im short"); +} + +TEST_F(MemoryMonitorTest, TestLongStringTruncated) { + std::string out = MemoryMonitor::TruncateString(std::string(7, 'k'), 5); + ASSERT_EQ(out, "kkkkk..."); +} + +TEST_F(MemoryMonitorTest, TestTopNLessThanNReturnsMemoryUsedDesc) { + absl::flat_hash_map usage; + usage.insert({1, 111}); + usage.insert({2, 222}); + usage.insert({3, 333}); + + auto list = MemoryMonitor::GetTopNMemoryUsage(2, usage); + + ASSERT_EQ(list.size(), 2); + ASSERT_EQ(std::get<0>(list[0]), 3); + ASSERT_EQ(std::get<1>(list[0]), 333); + ASSERT_EQ(std::get<0>(list[1]), 2); + ASSERT_EQ(std::get<1>(list[1]), 222); +} + +TEST_F(MemoryMonitorTest, TestTopNMoreThanNReturnsAllDesc) { + absl::flat_hash_map usage; + usage.insert({1, 111}); + usage.insert({2, 222}); + + auto list = MemoryMonitor::GetTopNMemoryUsage(3, usage); + + ASSERT_EQ(list.size(), 2); + ASSERT_EQ(std::get<0>(list[0]), 2); + ASSERT_EQ(std::get<1>(list[0]), 222); + ASSERT_EQ(std::get<0>(list[1]), 1); + ASSERT_EQ(std::get<1>(list[1]), 111); +} + +TEST_F(MemoryMonitorTest, TestGetProcessMemoryUsageFiltersBadPids) { + std::string proc_dir = UniqueID::FromRandom().Hex(); + MakeMemoryUsage(1, "111", proc_dir); + + // Invalid pids with no memory usage file. + boost::filesystem::create_directory(proc_dir + "/2"); + boost::filesystem::create_directory(proc_dir + "/3"); + + auto usage = MemoryMonitor::GetProcessMemoryUsage(proc_dir); + + ASSERT_EQ(usage.size(), 1); + ASSERT_TRUE(usage.contains(1)); +} + } // namespace ray int main(int argc, char **argv) { diff --git a/src/ray/raylet/node_manager.cc b/src/ray/raylet/node_manager.cc index 387ab5cdf474..021b595017f6 100644 --- a/src/ray/raylet/node_manager.cc +++ b/src/ray/raylet/node_manager.cc @@ -2927,6 +2927,7 @@ MemoryUsageRefreshCallback NodeManager::CreateMemoryUsageRefreshCallback() { << "worker pid: " << high_memory_eviction_target_->GetProcess().GetId() << "task: " << high_memory_eviction_target_->GetAssignedTaskId(); } else { + system_memory.process_used_bytes = MemoryMonitor::GetProcessMemoryUsage(); auto workers = worker_pool_.GetAllRegisteredWorkers(); if (workers.empty()) { RAY_LOG_EVERY_MS(WARNING, 5000) @@ -2937,7 +2938,7 @@ MemoryUsageRefreshCallback NodeManager::CreateMemoryUsageRefreshCallback() { } RetriableLIFOWorkerKillingPolicy worker_killing_policy; auto worker_to_kill = - worker_killing_policy.SelectWorkerToKill(workers, *memory_monitor_.get()); + worker_killing_policy.SelectWorkerToKill(workers, system_memory); if (worker_to_kill == nullptr) { RAY_LOG_EVERY_MS(WARNING, 5000) << "Worker killer did not select a worker to " "kill even though memory usage is high."; @@ -2945,8 +2946,6 @@ MemoryUsageRefreshCallback NodeManager::CreateMemoryUsageRefreshCallback() { high_memory_eviction_target_ = worker_to_kill; /// TODO: (clarng) expose these strings in the frontend python error as well. - static std::string oom_kill_title = - "Task was killed due to the node running low on memory. "; std::string oom_kill_details = this->CreateOomKillMessageDetails( worker_to_kill, this->self_node_id_, system_memory, usage_threshold); std::string oom_kill_suggestions = @@ -2960,8 +2959,10 @@ MemoryUsageRefreshCallback NodeManager::CreateMemoryUsageRefreshCallback() { << oom_kill_suggestions; std::stringstream worker_exit_message_ss; - worker_exit_message_ss << oom_kill_title << oom_kill_details - << oom_kill_suggestions; + worker_exit_message_ss + << "Task was killed due to the node running low on memory.\n" + << oom_kill_details << "\n" + << oom_kill_suggestions; std::string worker_exit_message = worker_exit_message_ss.str(); rpc::RayErrorInfo task_failure_reason; @@ -3003,18 +3004,35 @@ const std::string NodeManager::CreateOomKillMessageDetails( FormatFloat(static_cast(system_memory.total_bytes) / 1024 / 1024 / 1024, 2); std::stringstream oom_kill_details_ss; + auto pid = worker->GetProcess().GetId(); + int64_t used_bytes = 0; + const auto pid_entry = system_memory.process_used_bytes.find(pid); + if (pid_entry != system_memory.process_used_bytes.end()) { + used_bytes = pid_entry->second; + } else { + return ""; + RAY_LOG_EVERY_MS(INFO, 60000) + << "Can't find memory usage for PID, reporting zero. PID: " << pid; + } + std::string process_used_bytes_gb = + FormatFloat(static_cast(used_bytes) / 1024 / 1024 / 1024, 2); + oom_kill_details_ss << "Memory on the node (IP: " << worker->IpAddress() << ", ID: " << node_id << ") where the task (" << worker->GetTaskOrActorIdAsDebugString() - << ", name= " << worker->GetAssignedTask().GetTaskSpecification().GetName() - << ") was running was " << used_bytes_gb << "GB / " << total_bytes_gb << "GB (" - << usage_fraction << "), which exceeds the memory usage threshold of " - << usage_threshold << ". Ray killed this worker (ID: " << worker->WorkerId() + << ", name=" << worker->GetAssignedTask().GetTaskSpecification().GetName() + << ", pid=" << worker->GetProcess().GetId() + << ", memory used=" << process_used_bytes_gb << "GB) was running was " + << used_bytes_gb << "GB / " << total_bytes_gb << "GB (" << usage_fraction + << "), which exceeds the memory usage threshold of " << usage_threshold + << ". Ray killed this worker (ID: " << worker->WorkerId() << ") because it was the most recently scheduled task; to see more " "information about memory usage on this node, use `ray logs raylet.out " "-ip " << worker->IpAddress() << "`. To see the logs of the worker, use `ray logs worker-" - << worker->WorkerId() << "*out -ip " << worker->IpAddress() << ". "; + << worker->WorkerId() << "*out -ip " << worker->IpAddress() + << ". Top 10 memory users:\n" + << MemoryMonitor::TopNMemoryDebugString(10, system_memory); return oom_kill_details_ss.str(); } diff --git a/src/ray/raylet/worker_killing_policy.cc b/src/ray/raylet/worker_killing_policy.cc index c70f52fbf11d..338fded1f4d0 100644 --- a/src/ray/raylet/worker_killing_policy.cc +++ b/src/ray/raylet/worker_killing_policy.cc @@ -30,7 +30,7 @@ RetriableLIFOWorkerKillingPolicy::RetriableLIFOWorkerKillingPolicy() {} const std::shared_ptr RetriableLIFOWorkerKillingPolicy::SelectWorkerToKill( const std::vector> &workers, - const MemoryMonitor &memory_monitor) const { + const MemorySnapshot &system_memory) const { if (workers.empty()) { RAY_LOG_EVERY_MS(INFO, 5000) << "Worker list is empty. Nothing can be killed"; return nullptr; @@ -55,7 +55,7 @@ RetriableLIFOWorkerKillingPolicy::SelectWorkerToKill( const static int32_t max_to_print = 10; RAY_LOG(INFO) << "The top 10 workers to be killed based on the worker killing policy:\n" - << WorkersDebugString(sorted, max_to_print, memory_monitor); + << WorkersDebugString(sorted, max_to_print, system_memory); return sorted.front(); } @@ -63,12 +63,19 @@ RetriableLIFOWorkerKillingPolicy::SelectWorkerToKill( std::string WorkerKillingPolicy::WorkersDebugString( const std::vector> &workers, int32_t num_workers, - const MemoryMonitor &memory_monitor) { + const MemorySnapshot &system_memory) { std::stringstream result; int64_t index = 1; for (auto &worker : workers) { auto pid = worker->GetProcess().GetId(); - auto used_memory = memory_monitor.GetProcessMemoryBytes(pid); + int64_t used_memory = 0; + const auto pid_entry = system_memory.process_used_bytes.find(pid); + if (pid_entry != system_memory.process_used_bytes.end()) { + used_memory = pid_entry->second; + } else { + RAY_LOG_EVERY_MS(INFO, 60000) + << "Can't find memory usage for PID, reporting zero. PID: " << pid; + } result << "Worker " << index << ": task assigned time counter " << worker->GetAssignedTaskTime().time_since_epoch().count() << " worker id " << worker->WorkerId() << " memory used " << used_memory << " task spec " diff --git a/src/ray/raylet/worker_killing_policy.h b/src/ray/raylet/worker_killing_policy.h index 3a774b304847..b2063a9bd809 100644 --- a/src/ray/raylet/worker_killing_policy.h +++ b/src/ray/raylet/worker_killing_policy.h @@ -32,11 +32,12 @@ class WorkerKillingPolicy { /// Selects a worker to be killed. /// /// \param workers the list of candidate workers. + /// \param system_memory snapshot of memory usage. /// /// \return the worker to kill, or nullptr if the worker list is empty. virtual const std::shared_ptr SelectWorkerToKill( const std::vector> &workers, - const MemoryMonitor &memory_monitor) const = 0; + const MemorySnapshot &system_memory) const = 0; virtual ~WorkerKillingPolicy() {} @@ -46,12 +47,13 @@ class WorkerKillingPolicy { /// \param workers The workers to be printed. /// \param num_workers The number of workers to print starting from the beginning of the /// worker list. + /// \param system_memory snapshot of memory usage. /// /// \return the debug string. static std::string WorkersDebugString( const std::vector> &workers, int32_t num_workers, - const MemoryMonitor &memory_monitor); + const MemorySnapshot &system_memory); }; /// Prefers killing retriable workers over non-retriable ones, in LIFO order. @@ -60,7 +62,7 @@ class RetriableLIFOWorkerKillingPolicy : public WorkerKillingPolicy { RetriableLIFOWorkerKillingPolicy(); const std::shared_ptr SelectWorkerToKill( const std::vector> &workers, - const MemoryMonitor &memory_monitor) const; + const MemorySnapshot &system_memory) const; }; } // namespace raylet diff --git a/src/ray/raylet/worker_killing_policy_test.cc b/src/ray/raylet/worker_killing_policy_test.cc index b00b11ff743a..2630741fb733 100644 --- a/src/ray/raylet/worker_killing_policy_test.cc +++ b/src/ray/raylet/worker_killing_policy_test.cc @@ -27,14 +27,6 @@ namespace raylet { class WorkerKillerTest : public ::testing::Test { protected: instrumented_io_context io_context_; - MemoryMonitor memory_monitor_ = { - io_context_, - 0 /*usage_threshold*/, - -1 /*min_memory_free_bytes*/, - 0 /*refresh_interval_ms*/, - [](bool is_usage_above_threshold, - MemorySnapshot system_memory, - float usage_threshold) { FAIL() << "Monitor should not be running"; }}; int32_t port_ = 2389; RetriableLIFOWorkerKillingPolicy worker_killing_policy_; @@ -75,7 +67,7 @@ class WorkerKillerTest : public ::testing::Test { TEST_F(WorkerKillerTest, TestEmptyWorkerPoolSelectsNullWorker) { std::vector> workers; std::shared_ptr worker_to_kill = - worker_killing_policy_.SelectWorkerToKill(workers, memory_monitor_); + worker_killing_policy_.SelectWorkerToKill(workers, MemorySnapshot()); ASSERT_TRUE(worker_to_kill == nullptr); } @@ -108,7 +100,7 @@ TEST_F(WorkerKillerTest, for (const auto &expected : expected_order) { std::shared_ptr worker_to_kill = - worker_killing_policy_.SelectWorkerToKill(workers, memory_monitor_); + worker_killing_policy_.SelectWorkerToKill(workers, MemorySnapshot()); ASSERT_EQ(worker_to_kill->WorkerId(), expected->WorkerId()); workers.erase(std::remove(workers.begin(), workers.end(), worker_to_kill), workers.end());