Skip to content

Commit

Permalink
[core] oom killer policy: group by owner id (ray-project#31272)
Browse files Browse the repository at this point in the history
Group by owner worker killing policy. Comparing to the current oom killer policy (simple lifo) it produces less thrashing as it tries to allow tasks of the same owner (the task caller) to execute in favor of tasks from a different owner, thereby allowing progress and freeing up of resources when the group of tasks complete.

Helps in cases such as Tune multiple trials where multiple groups of datasets are processed in parallel, with this policy it will try to process datasets so it completes one by one to reduce resource contention


Signed-off-by: Clarence Ng <[email protected]>
Signed-off-by: Andrea Pisoni <[email protected]>
  • Loading branch information
clarng authored and andreapiso committed Jan 22, 2023
1 parent 9040fb7 commit 8f9d87b
Show file tree
Hide file tree
Showing 12 changed files with 576 additions and 31 deletions.
16 changes: 14 additions & 2 deletions BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -1651,9 +1651,21 @@ cc_test(
],
copts = COPTS,
tags = ["team:core"],
target_compatible_with = [
"@platforms//os:linux",
deps = [
":ray_common",
":raylet_lib",
"@com_google_googletest//:gtest_main",
],
)

cc_test(
name = "worker_killing_policy_group_by_owner_test",
size = "small",
srcs = [
"src/ray/raylet/worker_killing_policy_group_by_owner_test.cc",
],
copts = COPTS,
tags = ["team:core"],
deps = [
":ray_common",
":raylet_lib",
Expand Down
3 changes: 3 additions & 0 deletions src/ray/common/ray_config_def.h
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,9 @@ RAY_CONFIG(uint64_t, task_failure_entry_ttl_ms, 15 * 60 * 1000)
/// that is not related to running out of memory. Retries indefinitely if the value is -1.
RAY_CONFIG(uint64_t, task_oom_retries, 15)

/// The worker killing policy to use, as defined in worker_killing_policy.h.
RAY_CONFIG(std::string, worker_killing_policy, "retriable_lifo")

/// If the raylet fails to get agent info, we will retry after this interval.
RAY_CONFIG(uint64_t, raylet_get_agent_info_interval_ms, 1)

Expand Down
5 changes: 3 additions & 2 deletions src/ray/raylet/node_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -283,6 +283,8 @@ NodeManager::NodeManager(instrumented_io_context &io_service,
next_resource_seq_no_(0),
ray_syncer_(io_service_, self_node_id_.Binary()),
ray_syncer_service_(ray_syncer_),
worker_killing_policy_(
CreateWorkerKillingPolicy(RayConfig::instance().worker_killing_policy())),
memory_monitor_(std::make_unique<MemoryMonitor>(
io_service,
RayConfig::instance().memory_usage_threshold(),
Expand Down Expand Up @@ -2869,9 +2871,8 @@ MemoryUsageRefreshCallback NodeManager::CreateMemoryUsageRefreshCallback() {
<< "idle worker are occupying most of the memory.";
return;
}
RetriableLIFOWorkerKillingPolicy worker_killing_policy;
auto worker_to_kill_and_should_retry =
worker_killing_policy.SelectWorkerToKill(workers, system_memory);
worker_killing_policy_->SelectWorkerToKill(workers, system_memory);
auto worker_to_kill = worker_to_kill_and_should_retry.first;
bool should_retry = worker_to_kill_and_should_retry.second;
if (worker_to_kill == nullptr) {
Expand Down
4 changes: 4 additions & 0 deletions src/ray/raylet/node_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
#include "ray/common/asio/instrumented_io_context.h"
#include "ray/common/bundle_spec.h"
#include "ray/raylet/placement_group_resource_manager.h"
#include "ray/raylet/worker_killing_policy.h"
// clang-format on

namespace ray {
Expand Down Expand Up @@ -833,6 +834,9 @@ class NodeManager : public rpc::NodeManagerServiceHandler,
/// RaySyncerService for gRPC
syncer::RaySyncerService ray_syncer_service_;

/// The Policy for selecting the worker to kill when the node runs out of memory.
std::shared_ptr<WorkerKillingPolicy> worker_killing_policy_;

/// Monitors and reports node memory usage and whether it is above threshold.
std::unique_ptr<MemoryMonitor> memory_monitor_;
};
Expand Down
25 changes: 10 additions & 15 deletions src/ray/raylet/test/util.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@ class MockWorker : public WorkerInterface {
: worker_id_(worker_id),
port_(port),
is_detached_actor_(false),
runtime_env_hash_(runtime_env_hash) {}
runtime_env_hash_(runtime_env_hash),
job_id_(JobID::FromInt(859)) {}

WorkerID WorkerId() const override { return worker_id_; }

Expand All @@ -34,16 +35,14 @@ class MockWorker : public WorkerInterface {

void SetOwnerAddress(const rpc::Address &address) override { address_ = address; }

void AssignTaskId(const TaskID &task_id) override {}
void AssignTaskId(const TaskID &task_id) override { task_id_ = task_id; }

void SetAssignedTask(const RayTask &assigned_task) override {
task_ = assigned_task;
task_assign_time_ = std::chrono::steady_clock::now();
task_assign_time_ = absl::Now();
};

const std::chrono::steady_clock::time_point GetAssignedTaskTime() const override {
return task_assign_time_;
};
absl::Time GetAssignedTaskTime() const override { return task_assign_time_; };

const std::string IpAddress() const override { return address_.ip_address(); }

Expand Down Expand Up @@ -95,10 +94,7 @@ class MockWorker : public WorkerInterface {
return -1;
}
void SetAssignedPort(int port) override { RAY_CHECK(false) << "Method unused"; }
const TaskID &GetAssignedTaskId() const override {
RAY_CHECK(false) << "Method unused";
return TaskID::Nil();
}
const TaskID &GetAssignedTaskId() const override { return task_id_; }
bool AddBlockedTaskId(const TaskID &task_id) override {
RAY_CHECK(false) << "Method unused";
return false;
Expand All @@ -112,10 +108,7 @@ class MockWorker : public WorkerInterface {
auto *t = new std::unordered_set<TaskID>();
return *t;
}
const JobID &GetAssignedJobId() const override {
RAY_CHECK(false) << "Method unused";
return JobID::Nil();
}
const JobID &GetAssignedJobId() const override { return job_id_; }
int GetRuntimeEnvHash() const override { return runtime_env_hash_; }
void AssignActorId(const ActorID &actor_id) override {
RAY_CHECK(false) << "Method unused";
Expand Down Expand Up @@ -189,8 +182,10 @@ class MockWorker : public WorkerInterface {
BundleID bundle_id_;
bool blocked_ = false;
RayTask task_;
std::chrono::steady_clock::time_point task_assign_time_;
absl::Time task_assign_time_;
int runtime_env_hash_;
TaskID task_id_;
JobID job_id_;
};

} // namespace raylet
Expand Down
14 changes: 7 additions & 7 deletions src/ray/raylet/worker.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
#include <memory>

#include "absl/memory/memory.h"
#include "absl/time/clock.h"
#include "absl/time/time.h"
#include "gtest/gtest_prod.h"
#include "ray/common/client_connection.h"
#include "ray/common/id.h"
Expand Down Expand Up @@ -109,7 +111,7 @@ class WorkerInterface {
virtual bool IsAvailableForScheduling() const = 0;

/// Time when the last task was assigned to this worker.
virtual const std::chrono::steady_clock::time_point GetAssignedTaskTime() const = 0;
virtual absl::Time GetAssignedTaskTime() const = 0;

protected:
virtual void SetStartupToken(StartupToken startup_token) = 0;
Expand Down Expand Up @@ -213,12 +215,10 @@ class Worker : public WorkerInterface {

void SetAssignedTask(const RayTask &assigned_task) {
assigned_task_ = assigned_task;
task_assign_time_ = std::chrono::steady_clock::now();
};
task_assign_time_ = absl::Now();
}

const std::chrono::steady_clock::time_point GetAssignedTaskTime() const {
return task_assign_time_;
};
absl::Time GetAssignedTaskTime() const { return task_assign_time_; };

bool IsRegistered() { return rpc_client_ != nullptr; }

Expand Down Expand Up @@ -298,7 +298,7 @@ class Worker : public WorkerInterface {
/// RayTask being assigned to this worker.
RayTask assigned_task_;
/// Time when the last task was assigned to this worker.
std::chrono::steady_clock::time_point task_assign_time_;
absl::Time task_assign_time_;
/// If true, a RPC need to be sent to notify the worker about GCS restarting.
bool notify_gcs_restarted_ = false;
};
Expand Down
25 changes: 22 additions & 3 deletions src/ray/raylet/worker_killing_policy.cc
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
#include "ray/common/asio/instrumented_io_context.h"
#include "ray/common/asio/periodical_runner.h"
#include "ray/raylet/worker.h"
#include "ray/raylet/worker_killing_policy_group_by_owner.h"
#include "ray/raylet/worker_pool.h"

namespace ray {
Expand Down Expand Up @@ -76,10 +77,12 @@ std::string WorkerKillingPolicy::WorkersDebugString(
RAY_LOG_EVERY_MS(INFO, 60000)
<< "Can't find memory usage for PID, reporting zero. PID: " << pid;
}
result << "Worker " << index << ": task assigned time counter "
<< worker->GetAssignedTaskTime().time_since_epoch().count() << " worker id "
<< worker->WorkerId() << " memory used " << used_memory << " task spec "
result << "Worker " << index << ": task assigned time "
<< absl::FormatTime(worker->GetAssignedTaskTime(), absl::UTCTimeZone())
<< " worker id " << worker->WorkerId() << " memory used " << used_memory
<< " task spec "
<< worker->GetAssignedTask().GetTaskSpecification().DebugString() << "\n";

index += 1;
if (index > num_workers) {
break;
Expand All @@ -88,6 +91,22 @@ std::string WorkerKillingPolicy::WorkersDebugString(
return result.str();
}

std::shared_ptr<WorkerKillingPolicy> CreateWorkerKillingPolicy(
std::string killing_policy_str) {
if (killing_policy_str == kLifoPolicy) {
RAY_LOG(INFO) << "Running RetriableLIFO policy.";
return std::make_shared<RetriableLIFOWorkerKillingPolicy>();
} else if (killing_policy_str == kGroupByOwner) {
RAY_LOG(INFO) << "Running GroupByOwner policy.";
return std::make_shared<GroupByOwnerIdWorkerKillingPolicy>();
} else {
RAY_LOG(ERROR)
<< killing_policy_str
<< " is an invalid killing policy. Defaulting to RetriableLIFO policy.";
return std::make_shared<RetriableLIFOWorkerKillingPolicy>();
}
}

} // namespace raylet

} // namespace ray
6 changes: 6 additions & 0 deletions src/ray/raylet/worker_killing_policy.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@ namespace ray {

namespace raylet {

constexpr char kLifoPolicy[] = "retriable_lifo";
constexpr char kGroupByOwner[] = "group_by_owner";

/// Provides the policy on which worker to prioritize killing.
class WorkerKillingPolicy {
public:
Expand Down Expand Up @@ -65,6 +68,9 @@ class RetriableLIFOWorkerKillingPolicy : public WorkerKillingPolicy {
const MemorySnapshot &system_memory) const;
};

std::shared_ptr<WorkerKillingPolicy> CreateWorkerKillingPolicy(
std::string killing_policy_str);

} // namespace raylet

} // namespace ray
Loading

0 comments on commit 8f9d87b

Please sign in to comment.