Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[core] oom killer policy: group by owner id #31272

Merged
merged 25 commits into from
Jan 20, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
1410fce
[core] add option for raylet to inform whether a task should be retried
clarng Dec 20, 2022
f6f33b2
[core] add option for raylet to inform whether a task should be retried
clarng Dec 21, 2022
b03de17
[core] group by owner policy
clarng Dec 21, 2022
316b823
[core] group by owner policy
clarng Dec 21, 2022
68ca544
[core] group by owner policy
clarng Dec 22, 2022
e1b04c8
Merge branch 'master' of https://github.com/ray-project/ray into gpolicy
clarng Jan 4, 2023
a7f9a9c
Merge branch 'master' of https://github.com/ray-project/ray into gpolicy
clarng Jan 4, 2023
6edb922
Merge branch 'master' of https://github.com/ray-project/ray into gpolicy
clarng Jan 5, 2023
0b85002
Merge branch 'master' of https://github.com/ray-project/ray into gpolicy
clarng Jan 10, 2023
fd959af
[core] oom killer policy: group by owner id
clarng Jan 10, 2023
db1305b
[core] oom killer policy: group by owner id
clarng Jan 11, 2023
6cf64f3
[core] oom killer policy: group by owner id
clarng Jan 11, 2023
de6b9d4
[core] oom killer policy: group by owner id
clarng Jan 11, 2023
a4442a5
[core] oom killer policy: group by owner id
clarng Jan 11, 2023
3650050
[core] oom killer policy: group by owner id
clarng Jan 11, 2023
12ece5e
Merge branch 'master' of https://github.com/ray-project/ray into gpolicy
clarng Jan 11, 2023
3eded6e
[core] oom killer policy: group by owner id
clarng Jan 16, 2023
2675d65
[core] oom killer policy: group by owner id
clarng Jan 16, 2023
ea26166
Merge branch 'master' of https://github.com/ray-project/ray into gpolicy
clarng Jan 16, 2023
3269f65
[core] oom killer policy: group by owner id
clarng Jan 17, 2023
9b5b5a4
Merge branch 'master' of https://github.com/ray-project/ray into gpolicy
clarng Jan 17, 2023
2211169
[core] oom killer policy: group by owner id
clarng Jan 19, 2023
b661ef5
Merge branch 'master' of https://github.com/ray-project/ray into gpolicy
clarng Jan 19, 2023
417ba36
Merge branch 'master' of https://github.com/ray-project/ray into gpolicy
clarng Jan 19, 2023
afcb951
[core] oom killer policy: group by owner id
clarng Jan 19, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 14 additions & 2 deletions BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -1651,9 +1651,21 @@ cc_test(
],
copts = COPTS,
tags = ["team:core"],
target_compatible_with = [
"@platforms//os:linux",
deps = [
":ray_common",
":raylet_lib",
"@com_google_googletest//:gtest_main",
],
)

cc_test(
name = "worker_killing_policy_group_by_owner_test",
size = "small",
srcs = [
"src/ray/raylet/worker_killing_policy_group_by_owner_test.cc",
],
copts = COPTS,
tags = ["team:core"],
deps = [
":ray_common",
":raylet_lib",
Expand Down
3 changes: 3 additions & 0 deletions src/ray/common/ray_config_def.h
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,9 @@ RAY_CONFIG(uint64_t, task_failure_entry_ttl_ms, 15 * 60 * 1000)
/// that is not related to running out of memory. Retries indefinitely if the value is -1.
RAY_CONFIG(uint64_t, task_oom_retries, 15)

/// The worker killing policy to use, as defined in worker_killing_policy.h.
RAY_CONFIG(std::string, worker_killing_policy, "retriable_lifo")
clarng marked this conversation as resolved.
Show resolved Hide resolved

/// If the raylet fails to get agent info, we will retry after this interval.
RAY_CONFIG(uint64_t, raylet_get_agent_info_interval_ms, 1)

Expand Down
5 changes: 3 additions & 2 deletions src/ray/raylet/node_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -283,6 +283,8 @@ NodeManager::NodeManager(instrumented_io_context &io_service,
next_resource_seq_no_(0),
ray_syncer_(io_service_, self_node_id_.Binary()),
ray_syncer_service_(ray_syncer_),
worker_killing_policy_(
CreateWorkerKillingPolicy(RayConfig::instance().worker_killing_policy())),
memory_monitor_(std::make_unique<MemoryMonitor>(
io_service,
RayConfig::instance().memory_usage_threshold(),
Expand Down Expand Up @@ -2869,9 +2871,8 @@ MemoryUsageRefreshCallback NodeManager::CreateMemoryUsageRefreshCallback() {
<< "idle worker are occupying most of the memory.";
return;
}
RetriableLIFOWorkerKillingPolicy worker_killing_policy;
auto worker_to_kill_and_should_retry =
worker_killing_policy.SelectWorkerToKill(workers, system_memory);
worker_killing_policy_->SelectWorkerToKill(workers, system_memory);
auto worker_to_kill = worker_to_kill_and_should_retry.first;
bool should_retry = worker_to_kill_and_should_retry.second;
if (worker_to_kill == nullptr) {
Expand Down
4 changes: 4 additions & 0 deletions src/ray/raylet/node_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
#include "ray/common/asio/instrumented_io_context.h"
#include "ray/common/bundle_spec.h"
#include "ray/raylet/placement_group_resource_manager.h"
#include "ray/raylet/worker_killing_policy.h"
// clang-format on

namespace ray {
Expand Down Expand Up @@ -833,6 +834,9 @@ class NodeManager : public rpc::NodeManagerServiceHandler,
/// RaySyncerService for gRPC
syncer::RaySyncerService ray_syncer_service_;

/// The Policy for selecting the worker to kill when the node runs out of memory.
std::shared_ptr<WorkerKillingPolicy> worker_killing_policy_;

/// Monitors and reports node memory usage and whether it is above threshold.
std::unique_ptr<MemoryMonitor> memory_monitor_;
};
Expand Down
25 changes: 10 additions & 15 deletions src/ray/raylet/test/util.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@ class MockWorker : public WorkerInterface {
: worker_id_(worker_id),
port_(port),
is_detached_actor_(false),
runtime_env_hash_(runtime_env_hash) {}
runtime_env_hash_(runtime_env_hash),
job_id_(JobID::FromInt(859)) {}

WorkerID WorkerId() const override { return worker_id_; }

Expand All @@ -34,16 +35,14 @@ class MockWorker : public WorkerInterface {

void SetOwnerAddress(const rpc::Address &address) override { address_ = address; }

void AssignTaskId(const TaskID &task_id) override {}
void AssignTaskId(const TaskID &task_id) override { task_id_ = task_id; }

void SetAssignedTask(const RayTask &assigned_task) override {
task_ = assigned_task;
task_assign_time_ = std::chrono::steady_clock::now();
task_assign_time_ = absl::Now();
};

const std::chrono::steady_clock::time_point GetAssignedTaskTime() const override {
return task_assign_time_;
};
absl::Time GetAssignedTaskTime() const override { return task_assign_time_; };

const std::string IpAddress() const override { return address_.ip_address(); }

Expand Down Expand Up @@ -95,10 +94,7 @@ class MockWorker : public WorkerInterface {
return -1;
}
void SetAssignedPort(int port) override { RAY_CHECK(false) << "Method unused"; }
const TaskID &GetAssignedTaskId() const override {
RAY_CHECK(false) << "Method unused";
return TaskID::Nil();
}
const TaskID &GetAssignedTaskId() const override { return task_id_; }
bool AddBlockedTaskId(const TaskID &task_id) override {
RAY_CHECK(false) << "Method unused";
return false;
Expand All @@ -112,10 +108,7 @@ class MockWorker : public WorkerInterface {
auto *t = new std::unordered_set<TaskID>();
return *t;
}
const JobID &GetAssignedJobId() const override {
RAY_CHECK(false) << "Method unused";
return JobID::Nil();
}
const JobID &GetAssignedJobId() const override { return job_id_; }
int GetRuntimeEnvHash() const override { return runtime_env_hash_; }
void AssignActorId(const ActorID &actor_id) override {
RAY_CHECK(false) << "Method unused";
Expand Down Expand Up @@ -189,8 +182,10 @@ class MockWorker : public WorkerInterface {
BundleID bundle_id_;
bool blocked_ = false;
RayTask task_;
std::chrono::steady_clock::time_point task_assign_time_;
absl::Time task_assign_time_;
int runtime_env_hash_;
TaskID task_id_;
JobID job_id_;
};

} // namespace raylet
Expand Down
14 changes: 7 additions & 7 deletions src/ray/raylet/worker.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
#include <memory>

#include "absl/memory/memory.h"
#include "absl/time/clock.h"
#include "absl/time/time.h"
#include "gtest/gtest_prod.h"
#include "ray/common/client_connection.h"
#include "ray/common/id.h"
Expand Down Expand Up @@ -109,7 +111,7 @@ class WorkerInterface {
virtual bool IsAvailableForScheduling() const = 0;

/// Time when the last task was assigned to this worker.
virtual const std::chrono::steady_clock::time_point GetAssignedTaskTime() const = 0;
virtual absl::Time GetAssignedTaskTime() const = 0;

protected:
virtual void SetStartupToken(StartupToken startup_token) = 0;
Expand Down Expand Up @@ -213,12 +215,10 @@ class Worker : public WorkerInterface {

void SetAssignedTask(const RayTask &assigned_task) {
assigned_task_ = assigned_task;
task_assign_time_ = std::chrono::steady_clock::now();
};
task_assign_time_ = absl::Now();
}

const std::chrono::steady_clock::time_point GetAssignedTaskTime() const {
return task_assign_time_;
};
absl::Time GetAssignedTaskTime() const { return task_assign_time_; };

bool IsRegistered() { return rpc_client_ != nullptr; }

Expand Down Expand Up @@ -298,7 +298,7 @@ class Worker : public WorkerInterface {
/// RayTask being assigned to this worker.
RayTask assigned_task_;
/// Time when the last task was assigned to this worker.
std::chrono::steady_clock::time_point task_assign_time_;
absl::Time task_assign_time_;
/// If true, a RPC need to be sent to notify the worker about GCS restarting.
bool notify_gcs_restarted_ = false;
};
Expand Down
25 changes: 22 additions & 3 deletions src/ray/raylet/worker_killing_policy.cc
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
#include "ray/common/asio/instrumented_io_context.h"
#include "ray/common/asio/periodical_runner.h"
#include "ray/raylet/worker.h"
#include "ray/raylet/worker_killing_policy_group_by_owner.h"
#include "ray/raylet/worker_pool.h"

namespace ray {
Expand Down Expand Up @@ -76,10 +77,12 @@ std::string WorkerKillingPolicy::WorkersDebugString(
RAY_LOG_EVERY_MS(INFO, 60000)
<< "Can't find memory usage for PID, reporting zero. PID: " << pid;
}
result << "Worker " << index << ": task assigned time counter "
<< worker->GetAssignedTaskTime().time_since_epoch().count() << " worker id "
<< worker->WorkerId() << " memory used " << used_memory << " task spec "
result << "Worker " << index << ": task assigned time "
<< absl::FormatTime(worker->GetAssignedTaskTime(), absl::UTCTimeZone())
<< " worker id " << worker->WorkerId() << " memory used " << used_memory
<< " task spec "
<< worker->GetAssignedTask().GetTaskSpecification().DebugString() << "\n";

index += 1;
if (index > num_workers) {
break;
Expand All @@ -88,6 +91,22 @@ std::string WorkerKillingPolicy::WorkersDebugString(
return result.str();
}

std::shared_ptr<WorkerKillingPolicy> CreateWorkerKillingPolicy(
std::string killing_policy_str) {
if (killing_policy_str == kLifoPolicy) {
RAY_LOG(INFO) << "Running RetriableLIFO policy.";
return std::make_shared<RetriableLIFOWorkerKillingPolicy>();
} else if (killing_policy_str == kGroupByOwner) {
RAY_LOG(INFO) << "Running GroupByOwner policy.";
return std::make_shared<GroupByOwnerIdWorkerKillingPolicy>();
} else {
RAY_LOG(ERROR)
<< killing_policy_str
<< " is an invalid killing policy. Defaulting to RetriableLIFO policy.";
return std::make_shared<RetriableLIFOWorkerKillingPolicy>();
}
}

} // namespace raylet

} // namespace ray
6 changes: 6 additions & 0 deletions src/ray/raylet/worker_killing_policy.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@ namespace ray {

namespace raylet {

constexpr char kLifoPolicy[] = "retriable_lifo";
constexpr char kGroupByOwner[] = "group_by_owner";

/// Provides the policy on which worker to prioritize killing.
class WorkerKillingPolicy {
public:
Expand Down Expand Up @@ -65,6 +68,9 @@ class RetriableLIFOWorkerKillingPolicy : public WorkerKillingPolicy {
const MemorySnapshot &system_memory) const;
};

std::shared_ptr<WorkerKillingPolicy> CreateWorkerKillingPolicy(
std::string killing_policy_str);

} // namespace raylet

} // namespace ray
Loading