From 1410fcec5c3438dd3e50509537415efcbdcabde8 Mon Sep 17 00:00:00 2001 From: Clarence Ng Date: Mon, 19 Dec 2022 21:30:58 -0800 Subject: [PATCH 01/16] [core] add option for raylet to inform whether a task should be retried Signed-off-by: Clarence Ng --- src/mock/ray/core_worker/task_manager.h | 3 +- src/ray/common/task/task_util.h | 14 +++-- src/ray/core_worker/task_manager.cc | 11 ++-- src/ray/core_worker/task_manager.h | 8 ++- .../test/dependency_resolver_test.cc | 3 +- .../test/direct_actor_transport_mock_test.cc | 4 +- .../test/direct_actor_transport_test.cc | 32 ++++++------ .../test/direct_task_transport_mock_test.cc | 7 +-- .../test/direct_task_transport_test.cc | 3 +- src/ray/core_worker/test/task_manager_test.cc | 52 +++++++++++++++++++ .../transport/direct_actor_task_submitter.cc | 3 +- .../transport/direct_task_transport.cc | 5 +- src/ray/protobuf/node_manager.proto | 1 + src/ray/raylet/node_manager.cc | 12 +++-- src/ray/raylet/node_manager.h | 3 +- src/ray/raylet/worker_killing_policy.cc | 6 +-- src/ray/raylet/worker_killing_policy.h | 6 +-- src/ray/raylet/worker_killing_policy_test.cc | 6 ++- 18 files changed, 132 insertions(+), 47 deletions(-) diff --git a/src/mock/ray/core_worker/task_manager.h b/src/mock/ray/core_worker/task_manager.h index 413efc69be96..31b8dc39bbc3 100644 --- a/src/mock/ray/core_worker/task_manager.h +++ b/src/mock/ray/core_worker/task_manager.h @@ -39,7 +39,8 @@ class MockTaskFinisherInterface : public TaskFinisherInterface { rpc::ErrorType error_type, const Status *status, const rpc::RayErrorInfo *ray_error_info, - bool mark_task_object_failed), + bool mark_task_object_failed, + bool fail_immediately), (override)); MOCK_METHOD(void, OnTaskDependenciesInlined, diff --git a/src/ray/common/task/task_util.h b/src/ray/common/task/task_util.h index 9d0761a9d60b..78766880f317 100644 --- a/src/ray/common/task/task_util.h +++ b/src/ray/common/task/task_util.h @@ -21,12 +21,20 @@ namespace ray { -/// Stores the task failure reason and when this entry was created. +/// Stores the task failure reason. struct TaskFailureEntry { + /// The task failure details. rpc::RayErrorInfo ray_error_info; + + /// The creation time of this entry. std::chrono::steady_clock::time_point creation_time; - TaskFailureEntry(const rpc::RayErrorInfo &ray_error_info) - : ray_error_info(ray_error_info), creation_time(std::chrono::steady_clock::now()) {} + + /// Whether this task should be retried. + bool should_retry; + TaskFailureEntry(const rpc::RayErrorInfo &ray_error_info, bool should_retry) + : ray_error_info(ray_error_info), + creation_time(std::chrono::steady_clock::now()), + should_retry(should_retry) {} }; /// Argument of a task. diff --git a/src/ray/core_worker/task_manager.cc b/src/ray/core_worker/task_manager.cc index 42328cc1b74f..6a377edfd088 100644 --- a/src/ray/core_worker/task_manager.cc +++ b/src/ray/core_worker/task_manager.cc @@ -540,13 +540,18 @@ bool TaskManager::FailOrRetryPendingTask(const TaskID &task_id, rpc::ErrorType error_type, const Status *status, const rpc::RayErrorInfo *ray_error_info, - bool mark_task_object_failed) { + bool mark_task_object_failed, + bool fail_immediately) { // Note that this might be the __ray_terminate__ task, so we don't log // loudly with ERROR here. RAY_LOG(DEBUG) << "Task attempt " << task_id << " failed with error " << rpc::ErrorType_Name(error_type); - const bool will_retry = RetryTaskIfPossible( - task_id, /*task_failed_due_to_oom*/ error_type == rpc::ErrorType::OUT_OF_MEMORY); + bool will_retry = false; + if (!fail_immediately) { + will_retry = RetryTaskIfPossible( + task_id, /*task_failed_due_to_oom*/ error_type == rpc::ErrorType::OUT_OF_MEMORY); + } + if (!will_retry && mark_task_object_failed) { FailPendingTask(task_id, error_type, status, ray_error_info); } diff --git a/src/ray/core_worker/task_manager.h b/src/ray/core_worker/task_manager.h index a931002d0575..264b4e982b51 100644 --- a/src/ray/core_worker/task_manager.h +++ b/src/ray/core_worker/task_manager.h @@ -49,7 +49,8 @@ class TaskFinisherInterface { rpc::ErrorType error_type, const Status *status, const rpc::RayErrorInfo *ray_error_info = nullptr, - bool mark_task_object_failed = true) = 0; + bool mark_task_object_failed = true, + bool fail_immediately = false) = 0; virtual void MarkTaskWaitingForExecution(const TaskID &task_id, const NodeID &node_id) = 0; @@ -185,12 +186,15 @@ class TaskManager : public TaskFinisherInterface, public TaskResubmissionInterfa /// \param[in] mark_task_object_failed whether or not it marks the task /// return object as failed. If this is set to false, then the caller is /// responsible for later failing or completing the task. + /// \param[in] fail_immediately whether to fail the task and ignore + /// the retries that are available. /// \return Whether the task will be retried or not. bool FailOrRetryPendingTask(const TaskID &task_id, rpc::ErrorType error_type, const Status *status = nullptr, const rpc::RayErrorInfo *ray_error_info = nullptr, - bool mark_task_object_failed = true) override; + bool mark_task_object_failed = true, + bool fail_immediately = false) override; /// A pending task failed. This will mark the task as failed. /// This doesn't always mark the return object as failed diff --git a/src/ray/core_worker/test/dependency_resolver_test.cc b/src/ray/core_worker/test/dependency_resolver_test.cc index be21907fefe8..1916c9c63263 100644 --- a/src/ray/core_worker/test/dependency_resolver_test.cc +++ b/src/ray/core_worker/test/dependency_resolver_test.cc @@ -82,7 +82,8 @@ class MockTaskFinisher : public TaskFinisherInterface { rpc::ErrorType error_type, const Status *status, const rpc::RayErrorInfo *ray_error_info = nullptr, - bool mark_task_object_failed = true) override { + bool mark_task_object_failed = true, + bool fail_immediately = false) override { num_tasks_failed++; return true; } diff --git a/src/ray/core_worker/test/direct_actor_transport_mock_test.cc b/src/ray/core_worker/test/direct_actor_transport_mock_test.cc index 1d440a511993..4e07d45a4ea7 100644 --- a/src/ray/core_worker/test/direct_actor_transport_mock_test.cc +++ b/src/ray/core_worker/test/direct_actor_transport_mock_test.cc @@ -97,7 +97,7 @@ TEST_F(DirectTaskTransportTest, ActorRegisterFailure) { EXPECT_CALL( *task_finisher, FailOrRetryPendingTask( - task_spec.TaskId(), rpc::ErrorType::DEPENDENCY_RESOLUTION_FAILED, _, _, _)); + task_spec.TaskId(), rpc::ErrorType::DEPENDENCY_RESOLUTION_FAILED, _, _, _, _)); register_cb(Status::IOError("")); } @@ -119,7 +119,7 @@ TEST_F(DirectTaskTransportTest, ActorRegisterOk) { ASSERT_TRUE(actor_creator->IsActorInRegistering(actor_id)); actor_task_submitter->AddActorQueueIfNotExists(actor_id, -1); ASSERT_TRUE(CheckSubmitTask(task_spec)); - EXPECT_CALL(*task_finisher, FailOrRetryPendingTask(_, _, _, _, _)).Times(0); + EXPECT_CALL(*task_finisher, FailOrRetryPendingTask(_, _, _, _, _, _)).Times(0); register_cb(Status::OK()); } diff --git a/src/ray/core_worker/test/direct_actor_transport_test.cc b/src/ray/core_worker/test/direct_actor_transport_test.cc index c79a7f9927f5..9ce8a1d8bbc4 100644 --- a/src/ray/core_worker/test/direct_actor_transport_test.cc +++ b/src/ray/core_worker/test/direct_actor_transport_test.cc @@ -165,7 +165,7 @@ TEST_P(DirectActorSubmitterTest, TestSubmitTask) { EXPECT_CALL(*task_finisher_, CompletePendingTask(_, _, _, _)) .Times(worker_client_->callbacks.size()); - EXPECT_CALL(*task_finisher_, FailOrRetryPendingTask(_, _, _, _, _)).Times(0); + EXPECT_CALL(*task_finisher_, FailOrRetryPendingTask(_, _, _, _, _, _)).Times(0); while (!worker_client_->callbacks.empty()) { ASSERT_TRUE(worker_client_->ReplyPushTask()); } @@ -314,18 +314,18 @@ TEST_P(DirectActorSubmitterTest, TestActorDead) { ASSERT_EQ(worker_client_->callbacks.size(), 1); // Simulate the actor dying. All in-flight tasks should get failed. - EXPECT_CALL(*task_finisher_, FailOrRetryPendingTask(task1.TaskId(), _, _, _, _)) + EXPECT_CALL(*task_finisher_, FailOrRetryPendingTask(task1.TaskId(), _, _, _, _, _)) .Times(1); EXPECT_CALL(*task_finisher_, CompletePendingTask(_, _, _, _)).Times(0); while (!worker_client_->callbacks.empty()) { ASSERT_TRUE(worker_client_->ReplyPushTask(Status::IOError(""))); } - EXPECT_CALL(*task_finisher_, FailOrRetryPendingTask(_, _, _, _, _)).Times(0); + EXPECT_CALL(*task_finisher_, FailOrRetryPendingTask(_, _, _, _, _, _)).Times(0); const auto death_cause = CreateMockDeathCause(); submitter_.DisconnectActor(actor_id, 1, /*dead=*/false, death_cause); // Actor marked as dead. All queued tasks should get failed. - EXPECT_CALL(*task_finisher_, FailOrRetryPendingTask(task2.TaskId(), _, _, _, _)) + EXPECT_CALL(*task_finisher_, FailOrRetryPendingTask(task2.TaskId(), _, _, _, _, _)) .Times(1); submitter_.DisconnectActor(actor_id, 2, /*dead=*/true, death_cause); } @@ -352,9 +352,9 @@ TEST_P(DirectActorSubmitterTest, TestActorRestartNoRetry) { ASSERT_TRUE(CheckSubmitTask(task3)); EXPECT_CALL(*task_finisher_, CompletePendingTask(task1.TaskId(), _, _, _)).Times(1); - EXPECT_CALL(*task_finisher_, FailOrRetryPendingTask(task2.TaskId(), _, _, _, _)) + EXPECT_CALL(*task_finisher_, FailOrRetryPendingTask(task2.TaskId(), _, _, _, _, _)) .Times(1); - EXPECT_CALL(*task_finisher_, FailOrRetryPendingTask(task3.TaskId(), _, _, _, _)) + EXPECT_CALL(*task_finisher_, FailOrRetryPendingTask(task3.TaskId(), _, _, _, _, _)) .Times(1); EXPECT_CALL(*task_finisher_, CompletePendingTask(task4.TaskId(), _, _, _)).Times(1); // First task finishes. Second task fails. @@ -407,10 +407,10 @@ TEST_P(DirectActorSubmitterTest, TestActorRestartRetry) { // All tasks will eventually finish. EXPECT_CALL(*task_finisher_, CompletePendingTask(_, _, _, _)).Times(4); // Tasks 2 and 3 will be retried. - EXPECT_CALL(*task_finisher_, FailOrRetryPendingTask(task2.TaskId(), _, _, _, _)) + EXPECT_CALL(*task_finisher_, FailOrRetryPendingTask(task2.TaskId(), _, _, _, _, _)) .Times(1) .WillRepeatedly(Return(true)); - EXPECT_CALL(*task_finisher_, FailOrRetryPendingTask(task3.TaskId(), _, _, _, _)) + EXPECT_CALL(*task_finisher_, FailOrRetryPendingTask(task3.TaskId(), _, _, _, _, _)) .Times(1) .WillRepeatedly(Return(true)); // First task finishes. Second task fails. @@ -467,7 +467,7 @@ TEST_P(DirectActorSubmitterTest, TestActorRestartOutOfOrderRetry) { EXPECT_CALL(*task_finisher_, CompletePendingTask(_, _, _, _)).Times(3); // Tasks 2 will be retried - EXPECT_CALL(*task_finisher_, FailOrRetryPendingTask(task2.TaskId(), _, _, _, _)) + EXPECT_CALL(*task_finisher_, FailOrRetryPendingTask(task2.TaskId(), _, _, _, _, _)) .Times(1) .WillRepeatedly(Return(true)); // First task finishes. Second task hang. Third task finishes. @@ -553,7 +553,7 @@ TEST_P(DirectActorSubmitterTest, TestActorRestartOutOfOrderGcs) { // Tasks submitted when the actor is in RESTARTING state will fail immediately. // This happens in an io_service.post. Search `SendPendingTasks_ForceFail` to locate // the code. - EXPECT_CALL(*task_finisher_, FailOrRetryPendingTask(task.TaskId(), _, _, _, _)) + EXPECT_CALL(*task_finisher_, FailOrRetryPendingTask(task.TaskId(), _, _, _, _, _)) .Times(1); ASSERT_EQ(io_context.poll_one(), 1); @@ -574,7 +574,7 @@ TEST_P(DirectActorSubmitterTest, TestActorRestartOutOfOrderGcs) { ASSERT_EQ(num_clients_connected_, 2); // Submit a task. task = CreateActorTaskHelper(actor_id, worker_id, 4); - EXPECT_CALL(*task_finisher_, FailOrRetryPendingTask(task.TaskId(), _, _, _, _)) + EXPECT_CALL(*task_finisher_, FailOrRetryPendingTask(task.TaskId(), _, _, _, _, _)) .Times(1); ASSERT_FALSE(CheckSubmitTask(task)); } @@ -605,9 +605,9 @@ TEST_P(DirectActorSubmitterTest, TestActorRestartFailInflightTasks) { ASSERT_TRUE(CheckSubmitTask(task3)); // Actor failed, but the task replies are delayed (or in some scenarios, lost). // We should still be able to fail the inflight tasks. - EXPECT_CALL(*task_finisher_, FailOrRetryPendingTask(task2.TaskId(), _, _, _, _)) + EXPECT_CALL(*task_finisher_, FailOrRetryPendingTask(task2.TaskId(), _, _, _, _, _)) .Times(1); - EXPECT_CALL(*task_finisher_, FailOrRetryPendingTask(task3.TaskId(), _, _, _, _)) + EXPECT_CALL(*task_finisher_, FailOrRetryPendingTask(task3.TaskId(), _, _, _, _, _)) .Times(1); const auto death_cause = CreateMockDeathCause(); submitter_.DisconnectActor(actor_id, 1, /*dead=*/false, death_cause); @@ -615,10 +615,10 @@ TEST_P(DirectActorSubmitterTest, TestActorRestartFailInflightTasks) { // The task replies are now received. Since the tasks are already failed, they will not // be marked as failed or finished again. EXPECT_CALL(*task_finisher_, CompletePendingTask(task2.TaskId(), _, _, _)).Times(0); - EXPECT_CALL(*task_finisher_, FailOrRetryPendingTask(task2.TaskId(), _, _, _, _)) + EXPECT_CALL(*task_finisher_, FailOrRetryPendingTask(task2.TaskId(), _, _, _, _, _)) .Times(0); EXPECT_CALL(*task_finisher_, CompletePendingTask(task3.TaskId(), _, _, _)).Times(0); - EXPECT_CALL(*task_finisher_, FailOrRetryPendingTask(task3.TaskId(), _, _, _, _)) + EXPECT_CALL(*task_finisher_, FailOrRetryPendingTask(task3.TaskId(), _, _, _, _, _)) .Times(0); // Task 2 replied with OK. ASSERT_TRUE(worker_client_->ReplyPushTask(Status::OK())); @@ -652,7 +652,7 @@ TEST_P(DirectActorSubmitterTest, TestActorRestartFastFail) { auto task2 = CreateActorTaskHelper(actor_id, worker_id, 1); ASSERT_TRUE(CheckSubmitTask(task2)); EXPECT_CALL(*task_finisher_, CompletePendingTask(task2.TaskId(), _, _, _)).Times(0); - EXPECT_CALL(*task_finisher_, FailOrRetryPendingTask(task2.TaskId(), _, _, _, _)) + EXPECT_CALL(*task_finisher_, FailOrRetryPendingTask(task2.TaskId(), _, _, _, _, _)) .Times(1); ASSERT_EQ(io_context.poll_one(), 1); } diff --git a/src/ray/core_worker/test/direct_task_transport_mock_test.cc b/src/ray/core_worker/test/direct_task_transport_mock_test.cc index ba3ba9faf408..32520a6e9d73 100644 --- a/src/ray/core_worker/test/direct_task_transport_mock_test.cc +++ b/src/ray/core_worker/test/direct_task_transport_mock_test.cc @@ -81,9 +81,10 @@ TEST_F(DirectTaskTransportTest, ActorCreationFail) { auto actor_id = ActorID::FromHex("f4ce02420592ca68c1738a0d01000000"); auto task_spec = GetCreatingTaskSpec(actor_id); EXPECT_CALL(*task_finisher, CompletePendingTask(_, _, _, _)).Times(0); - EXPECT_CALL(*task_finisher, - FailOrRetryPendingTask( - task_spec.TaskId(), rpc::ErrorType::ACTOR_CREATION_FAILED, _, _, true)); + EXPECT_CALL( + *task_finisher, + FailOrRetryPendingTask( + task_spec.TaskId(), rpc::ErrorType::ACTOR_CREATION_FAILED, _, _, true, false)); rpc::ClientCallback create_cb; EXPECT_CALL(*actor_creator, AsyncCreateActor(task_spec, _)) .WillOnce(DoAll(SaveArg<1>(&create_cb), Return(Status::OK()))); diff --git a/src/ray/core_worker/test/direct_task_transport_test.cc b/src/ray/core_worker/test/direct_task_transport_test.cc index 724e9914a571..02101a924241 100644 --- a/src/ray/core_worker/test/direct_task_transport_test.cc +++ b/src/ray/core_worker/test/direct_task_transport_test.cc @@ -122,7 +122,8 @@ class MockTaskFinisher : public TaskFinisherInterface { rpc::ErrorType error_type, const Status *status, const rpc::RayErrorInfo *ray_error_info = nullptr, - bool mark_task_object_failed = true) override { + bool mark_task_object_failed = true, + bool fail_immediately = false) override { num_tasks_failed++; return true; } diff --git a/src/ray/core_worker/test/task_manager_test.cc b/src/ray/core_worker/test/task_manager_test.cc index 0eec50faadb3..a22408e38238 100644 --- a/src/ray/core_worker/test/task_manager_test.cc +++ b/src/ray/core_worker/test/task_manager_test.cc @@ -453,6 +453,58 @@ TEST_F(TaskManagerTest, TestTaskNotRetriableOomFailsImmediatelyEvenWithOomRetryC ASSERT_EQ(stored_error, rpc::ErrorType::OUT_OF_MEMORY); } +TEST_F(TaskManagerTest, TestFailsImmediatelyOverridesRetry) { + RayConfig::instance().initialize(R"({"task_oom_retries": 1})"); + + { + ray::rpc::ErrorType error = rpc::ErrorType::OUT_OF_MEMORY; + + rpc::Address caller_address; + auto spec = CreateTaskHelper(1, {}); + manager_.AddPendingTask(caller_address, spec, "", /*max retries*/ 10); + auto return_id = spec.ReturnId(0); + + manager_.FailOrRetryPendingTask(spec.TaskId(), + error, + /*status*/ nullptr, + /*error info*/ nullptr, + /*mark object failed*/ true, + /*fail immediately*/ true); + + std::vector> results; + WorkerContext ctx(WorkerType::WORKER, WorkerID::FromRandom(), JobID::FromInt(0)); + RAY_CHECK_OK(store_->Get({return_id}, 1, 0, ctx, false, &results)); + ASSERT_EQ(results.size(), 1); + rpc::ErrorType stored_error; + ASSERT_TRUE(results[0]->IsException(&stored_error)); + ASSERT_EQ(stored_error, error); + } + + { + ray::rpc::ErrorType error = rpc::ErrorType::WORKER_DIED; + + rpc::Address caller_address; + auto spec = CreateTaskHelper(1, {}); + manager_.AddPendingTask(caller_address, spec, "", /*max retries*/ 10); + auto return_id = spec.ReturnId(0); + + manager_.FailOrRetryPendingTask(spec.TaskId(), + error, + /*status*/ nullptr, + /*error info*/ nullptr, + /*mark object failed*/ true, + /*fail immediately*/ true); + + std::vector> results; + WorkerContext ctx(WorkerType::WORKER, WorkerID::FromRandom(), JobID::FromInt(0)); + RAY_CHECK_OK(store_->Get({return_id}, 1, 0, ctx, false, &results)); + ASSERT_EQ(results.size(), 1); + rpc::ErrorType stored_error; + ASSERT_TRUE(results[0]->IsException(&stored_error)); + ASSERT_EQ(stored_error, error); + } +} + // Test to make sure that the task spec and dependencies for an object are // evicted when lineage pinning is disabled in the ReferenceCounter. TEST_F(TaskManagerTest, TestLineageEvicted) { diff --git a/src/ray/core_worker/transport/direct_actor_task_submitter.cc b/src/ray/core_worker/transport/direct_actor_task_submitter.cc index 0276654c223b..ce098aa02602 100644 --- a/src/ray/core_worker/transport/direct_actor_task_submitter.cc +++ b/src/ray/core_worker/transport/direct_actor_task_submitter.cc @@ -522,7 +522,8 @@ void CoreWorkerDirectActorTaskSubmitter::HandlePushTaskReply( error_type, &status, &error_info, - /*mark_task_object_failed*/ is_actor_dead); + /*mark_task_object_failed*/ is_actor_dead, + /*fail_immediatedly*/ false); if (!is_actor_dead && !will_retry) { // No retry == actor is dead. diff --git a/src/ray/core_worker/transport/direct_task_transport.cc b/src/ray/core_worker/transport/direct_task_transport.cc index ac7bde2dfc0d..e3e4536c477b 100644 --- a/src/ray/core_worker/transport/direct_task_transport.cc +++ b/src/ray/core_worker/transport/direct_task_transport.cc @@ -648,6 +648,7 @@ void CoreWorkerDirectTaskSubmitter::HandleGetTaskFailureCause( const rpc::GetTaskFailureCauseReply &get_task_failure_cause_reply) { rpc::ErrorType task_error_type = rpc::ErrorType::WORKER_DIED; std::unique_ptr error_info; + bool fail_immediately = false; if (get_task_failure_cause_reply_status.ok()) { RAY_LOG(DEBUG) << "Task failure cause for task " << task_id << ": " << ray::gcs::RayErrorInfoToString( @@ -679,7 +680,9 @@ void CoreWorkerDirectTaskSubmitter::HandleGetTaskFailureCause( task_id, is_actor ? rpc::ErrorType::ACTOR_DIED : task_error_type, &task_execution_status, - error_info.get())); + error_info.get(), + /*mark_task_object_failed*/ true, + fail_immediately)); } Status CoreWorkerDirectTaskSubmitter::CancelTask(TaskSpecification task_spec, diff --git a/src/ray/protobuf/node_manager.proto b/src/ray/protobuf/node_manager.proto index d0dec8ef80d6..d5861747faab 100644 --- a/src/ray/protobuf/node_manager.proto +++ b/src/ray/protobuf/node_manager.proto @@ -345,6 +345,7 @@ message GetTaskFailureCauseRequest { message GetTaskFailureCauseReply { optional RayErrorInfo failure_cause = 1; + bool fail_task_immediately = 2; } // Service for inter-node-manager communication. diff --git a/src/ray/raylet/node_manager.cc b/src/ray/raylet/node_manager.cc index c6cec7527ff5..0433a96d6bfe 100644 --- a/src/ray/raylet/node_manager.cc +++ b/src/ray/raylet/node_manager.cc @@ -2941,8 +2941,10 @@ MemoryUsageRefreshCallback NodeManager::CreateMemoryUsageRefreshCallback() { return; } RetriableLIFOWorkerKillingPolicy worker_killing_policy; - auto worker_to_kill = + auto worker_to_kill_and_should_retry = worker_killing_policy.SelectWorkerToKill(workers, system_memory); + auto worker_to_kill = worker_to_kill_and_should_retry.first; + bool should_retry = worker_to_kill_and_should_retry.second; if (worker_to_kill == nullptr) { RAY_LOG_EVERY_MS(WARNING, 5000) << "Worker killer did not select a worker to " "kill even though memory usage is high."; @@ -2973,7 +2975,8 @@ MemoryUsageRefreshCallback NodeManager::CreateMemoryUsageRefreshCallback() { task_failure_reason.set_error_message(worker_exit_message); task_failure_reason.set_error_type(rpc::ErrorType::OUT_OF_MEMORY); SetTaskFailureReason(worker_to_kill->GetAssignedTaskId(), - std::move(task_failure_reason)); + std::move(task_failure_reason), + should_retry); /// since we print the process memory in the message. Destroy should be called /// as soon as possible to free up memory. @@ -3069,9 +3072,10 @@ const std::string NodeManager::CreateOomKillMessageSuggestions( } void NodeManager::SetTaskFailureReason(const TaskID &task_id, - const rpc::RayErrorInfo &failure_reason) { + const rpc::RayErrorInfo &failure_reason, + bool should_retry) { RAY_LOG(DEBUG) << "set failure reason for task " << task_id; - ray::TaskFailureEntry entry(failure_reason); + ray::TaskFailureEntry entry(failure_reason, should_retry); auto result = task_failure_reasons_.emplace(task_id, std::move(entry)); if (!result.second) { RAY_LOG(WARNING) << "Trying to insert failure reason more than once for the same " diff --git a/src/ray/raylet/node_manager.h b/src/ray/raylet/node_manager.h index f8640c6e5a7a..60193219121e 100644 --- a/src/ray/raylet/node_manager.h +++ b/src/ray/raylet/node_manager.h @@ -693,7 +693,8 @@ class NodeManager : public rpc::NodeManagerServiceHandler, /// Stores the failure reason for the task. The entry will be cleaned up by a periodic /// function post TTL. void SetTaskFailureReason(const TaskID &task_id, - const rpc::RayErrorInfo &failure_reason); + const rpc::RayErrorInfo &failure_reason, + bool should_retry); /// Checks the expiry time of the task failures and garbage collect them. void GCTaskFailureReason(); diff --git a/src/ray/raylet/worker_killing_policy.cc b/src/ray/raylet/worker_killing_policy.cc index 338fded1f4d0..f5e738f0193d 100644 --- a/src/ray/raylet/worker_killing_policy.cc +++ b/src/ray/raylet/worker_killing_policy.cc @@ -27,13 +27,13 @@ namespace raylet { RetriableLIFOWorkerKillingPolicy::RetriableLIFOWorkerKillingPolicy() {} -const std::shared_ptr +const std::pair, bool> RetriableLIFOWorkerKillingPolicy::SelectWorkerToKill( const std::vector> &workers, const MemorySnapshot &system_memory) const { if (workers.empty()) { RAY_LOG_EVERY_MS(INFO, 5000) << "Worker list is empty. Nothing can be killed"; - return nullptr; + return std::make_pair(nullptr, /*should retry*/ false); } std::vector> sorted = workers; @@ -57,7 +57,7 @@ RetriableLIFOWorkerKillingPolicy::SelectWorkerToKill( RAY_LOG(INFO) << "The top 10 workers to be killed based on the worker killing policy:\n" << WorkersDebugString(sorted, max_to_print, system_memory); - return sorted.front(); + return std::make_pair(sorted.front(), /*should retry*/ true); } std::string WorkerKillingPolicy::WorkersDebugString( diff --git a/src/ray/raylet/worker_killing_policy.h b/src/ray/raylet/worker_killing_policy.h index b2063a9bd809..d7ac5f43e484 100644 --- a/src/ray/raylet/worker_killing_policy.h +++ b/src/ray/raylet/worker_killing_policy.h @@ -34,8 +34,8 @@ class WorkerKillingPolicy { /// \param workers the list of candidate workers. /// \param system_memory snapshot of memory usage. /// - /// \return the worker to kill, or nullptr if the worker list is empty. - virtual const std::shared_ptr SelectWorkerToKill( + /// \return the worker to kill and whethewr the task on the worker should be retried. + virtual const std::pair, bool> SelectWorkerToKill( const std::vector> &workers, const MemorySnapshot &system_memory) const = 0; @@ -60,7 +60,7 @@ class WorkerKillingPolicy { class RetriableLIFOWorkerKillingPolicy : public WorkerKillingPolicy { public: RetriableLIFOWorkerKillingPolicy(); - const std::shared_ptr SelectWorkerToKill( + const std::pair, bool> SelectWorkerToKill( const std::vector> &workers, const MemorySnapshot &system_memory) const; }; diff --git a/src/ray/raylet/worker_killing_policy_test.cc b/src/ray/raylet/worker_killing_policy_test.cc index 2630741fb733..ce2edbb0edd4 100644 --- a/src/ray/raylet/worker_killing_policy_test.cc +++ b/src/ray/raylet/worker_killing_policy_test.cc @@ -66,8 +66,9 @@ class WorkerKillerTest : public ::testing::Test { TEST_F(WorkerKillerTest, TestEmptyWorkerPoolSelectsNullWorker) { std::vector> workers; - std::shared_ptr worker_to_kill = + auto worker_to_kill_and_should_retry = worker_killing_policy_.SelectWorkerToKill(workers, MemorySnapshot()); + auto worker_to_kill = worker_to_kill_and_should_retry.first; ASSERT_TRUE(worker_to_kill == nullptr); } @@ -99,8 +100,9 @@ TEST_F(WorkerKillerTest, expected_order.push_back(first_submitted); for (const auto &expected : expected_order) { - std::shared_ptr worker_to_kill = + auto worker_to_kill_and_should_retry = worker_killing_policy_.SelectWorkerToKill(workers, MemorySnapshot()); + auto worker_to_kill = worker_to_kill_and_should_retry.first; ASSERT_EQ(worker_to_kill->WorkerId(), expected->WorkerId()); workers.erase(std::remove(workers.begin(), workers.end(), worker_to_kill), workers.end()); From f6f33b27d319687cc5e3a2f590c8aa4120781a2b Mon Sep 17 00:00:00 2001 From: Clarence Ng Date: Tue, 20 Dec 2022 17:41:14 -0800 Subject: [PATCH 02/16] [core] add option for raylet to inform whether a task should be retried Signed-off-by: Clarence Ng --- src/ray/raylet/node_manager.cc | 11 ++++++++--- src/ray/raylet/node_manager.h | 3 ++- 2 files changed, 10 insertions(+), 4 deletions(-) diff --git a/src/ray/raylet/node_manager.cc b/src/ray/raylet/node_manager.cc index 0433a96d6bfe..559f98eb506b 100644 --- a/src/ray/raylet/node_manager.cc +++ b/src/ray/raylet/node_manager.cc @@ -2952,8 +2952,12 @@ MemoryUsageRefreshCallback NodeManager::CreateMemoryUsageRefreshCallback() { high_memory_eviction_target_ = worker_to_kill; /// TODO: (clarng) expose these strings in the frontend python error as well. - std::string oom_kill_details = this->CreateOomKillMessageDetails( - worker_to_kill, this->self_node_id_, system_memory, usage_threshold); + std::string oom_kill_details = + this->CreateOomKillMessageDetails(worker_to_kill, + this->self_node_id_, + system_memory, + usage_threshold, + should_retry); std::string oom_kill_suggestions = this->CreateOomKillMessageSuggestions(worker_to_kill); @@ -3002,7 +3006,8 @@ const std::string NodeManager::CreateOomKillMessageDetails( const std::shared_ptr &worker, const NodeID &node_id, const MemorySnapshot &system_memory, - float usage_threshold) const { + float usage_threshold, + bool should_retry) const { float usage_fraction = static_cast(system_memory.used_bytes) / system_memory.total_bytes; std::string used_bytes_gb = diff --git a/src/ray/raylet/node_manager.h b/src/ray/raylet/node_manager.h index 60193219121e..1af98957906a 100644 --- a/src/ray/raylet/node_manager.h +++ b/src/ray/raylet/node_manager.h @@ -683,7 +683,8 @@ class NodeManager : public rpc::NodeManagerServiceHandler, const std::shared_ptr &worker, const NodeID &node_id, const MemorySnapshot &system_memory, - float usage_threshold) const; + float usage_threshold, + bool should_retry) const; /// Creates the suggestion message for the worker that is killed due to memory running /// low. From b03de1736c6e428c4dd425587d531f3ff0760e85 Mon Sep 17 00:00:00 2001 From: Clarence Ng Date: Wed, 21 Dec 2022 09:45:06 -0800 Subject: [PATCH 03/16] [core] group by owner policy Signed-off-by: Clarence Ng --- .../worker_killing_policy_group_by_owner.cc | 141 ++++++++++++++++++ .../worker_killing_policy_group_by_owner.h | 90 +++++++++++ 2 files changed, 231 insertions(+) create mode 100644 src/ray/raylet/worker_killing_policy_group_by_owner.cc create mode 100644 src/ray/raylet/worker_killing_policy_group_by_owner.h diff --git a/src/ray/raylet/worker_killing_policy_group_by_owner.cc b/src/ray/raylet/worker_killing_policy_group_by_owner.cc new file mode 100644 index 000000000000..914f6a822e15 --- /dev/null +++ b/src/ray/raylet/worker_killing_policy_group_by_owner.cc @@ -0,0 +1,141 @@ +// Copyright 2022 The Ray Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "ray/raylet/worker_killing_policy.h" + +#include + +#include "ray/common/asio/instrumented_io_context.h" +#include "ray/common/asio/periodical_runner.h" +#include "ray/raylet/worker.h" +#include "ray/raylet/worker_pool.h" +#include "ray/raylet/worker_killing_policy_group_by_owner.h" +#include "absl/container/flat_hash_map.h" +#include +#include + +namespace ray { + +namespace raylet { + +GroupByOwnerIdWorkerKillingPolicy::GroupByOwnerIdWorkerKillingPolicy() {} + +const std::pair, bool> +GroupByOwnerIdWorkerKillingPolicy::SelectWorkerToKill( + const std::vector> &workers, + const MemorySnapshot &system_memory) const { + if (workers.empty()) { + RAY_LOG_EVERY_MS(INFO, 5000) << "Worker list is empty. Nothing can be killed"; + return std::make_pair(nullptr, /*should retry*/ false); + } + + GroupMap group_map(10, group_key_hashing_func, group_key_equal_fn); + + for (auto worker : workers) { + TaskID owner_id = worker->GetAssignedTask().GetTaskSpecification().ParentTaskId(); + bool retriable = worker->GetAssignedTask().GetTaskSpecification().IsRetriable(); + + GroupKey group_key(owner_id, retriable); + auto it = group_map.find(group_key); + + if (it == group_map.end()) { + Group group(owner_id, retriable); + group_map.insert({group_key, group}); + group.AddToGroup(worker); + } else { + auto group = it->second; + group.AddToGroup(worker); + } + } + + std::vector sorted; + for(auto it = group_map.begin(); it != group_map.end(); ++it) { + sorted.push_back(it->second); + } + + std::sort(sorted.begin(), + sorted.end(), + [](Group const &left, + Group const &right) -> bool { + int left_retriable = left.IsRetriable() ? 0 : 1; + int right_retriable = right.IsRetriable() ? 0 : 1; + if (left_retriable == right_retriable) { + return left.GetAssignedTaskTime() > right.GetAssignedTaskTime(); + } + return left_retriable < right_retriable; + return 0; + }); + + Group selected_group = sorted.front(); + auto worker_to_kill = selected_group.SelectWorkerToKill(); + bool should_retry = selected_group.WorkerCount() > 1; + + // const static int32_t max_to_print = 10; + // RAY_LOG(INFO) << "The top 10 workers to be killed based on the worker killing policy:\n" + // << WorkersDebugString(sorted, max_to_print, system_memory); + + // return std::make_pair(sorted.front(), /*should retry*/ true); + + return std::make_pair(worker_to_kill, should_retry); +} + +bool Group::IsRetriable() const { + return retriable_; +} + +const std::chrono::steady_clock::time_point Group::GetAssignedTaskTime() const { + return time_; +} + +void Group::AddToGroup(std::shared_ptr worker) { + if (worker->GetAssignedTaskTime() < time_) { + time_ = worker->GetAssignedTaskTime(); + } + if (workers_.empty()) { + retriable_ = worker->GetAssignedTask().GetTaskSpecification().IsRetriable(); + } else { + RAY_CHECK_EQ(retriable_, worker->GetAssignedTask().GetTaskSpecification().IsRetriable()); + } + workers_.insert(worker); +} + +const std::shared_ptr Group::SelectWorkerToKill() const { + RAY_CHECK(!workers_.empty()); + + std::vector> sorted(workers_.begin(), workers_.end()); + + std::sort(sorted.begin(), + sorted.end(), + [](std::shared_ptr const &left, + std::shared_ptr const &right) -> bool { + int left_retriable = + left->GetAssignedTask().GetTaskSpecification().IsRetriable() ? 0 : 1; + int right_retriable = + right->GetAssignedTask().GetTaskSpecification().IsRetriable() ? 0 : 1; + if (left_retriable == right_retriable) { + return left->GetAssignedTaskTime() > right->GetAssignedTaskTime(); + } + return left_retriable < right_retriable; + }); + + return sorted.front(); +} + +int32_t Group::WorkerCount() const { + return workers_.size(); +} + +} // namespace raylet + +} // namespace ray diff --git a/src/ray/raylet/worker_killing_policy_group_by_owner.h b/src/ray/raylet/worker_killing_policy_group_by_owner.h new file mode 100644 index 000000000000..9158cb0183ac --- /dev/null +++ b/src/ray/raylet/worker_killing_policy_group_by_owner.h @@ -0,0 +1,90 @@ +// Copyright 2022 The Ray Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#include + +#include "ray/common/memory_monitor.h" +#include "ray/raylet/worker.h" +#include "ray/raylet/worker_killing_policy.h" +#include "absl/container/flat_hash_set.h" + + +namespace ray { + +namespace raylet { + +/// Groups worker by its owner id if it is a task. Each actor belongs to its own group. +/// The inter-group policy prioritizes killing groups that are retriable first, then in LIFO order, +/// where each group's priority is based on the time of its earliest submitted member. +/// The intra-group policy prioritizes killing in LIFO order. +/// +/// It will set the task to-be-killed to be non-retriable if it is the last member of the group. +/// Otherwise the task is set to be retriable. +class GroupByOwnerIdWorkerKillingPolicy : public WorkerKillingPolicy { + public: + GroupByOwnerIdWorkerKillingPolicy(); + const std::pair, bool> SelectWorkerToKill( + const std::vector> &workers, + const MemorySnapshot &system_memory) const; +}; + +struct GroupKey { + GroupKey(const TaskID& owner_id, bool retriable): owner_id(owner_id), retriable(retriable) {} + const TaskID& owner_id; + bool retriable; +}; + +class Group { + public: + Group(const TaskID& owner_id, bool retriable): owner_id_(owner_id), retriable_(retriable) {} + bool IsRetriable() const; + const std::chrono::steady_clock::time_point GetAssignedTaskTime() const; + const std::shared_ptr SelectWorkerToKill() const; + int32_t WorkerCount() const; + void AddToGroup(std::shared_ptr worker); + + private: + /// Tasks belonging to this group. + absl::flat_hash_set> workers_; + + /// The earliest creation time of the tasks. + std::chrono::steady_clock::time_point time_ = std::chrono::steady_clock::now(); + + /// The owner id shared by tasks of this group. + TaskID owner_id_; + + /// Whether the tasks are retriable. + bool retriable_; +}; + +typedef std::unordered_map, + std::function> GroupMap; + +unsigned long group_key_hashing_func(const GroupKey& key) { + unsigned long hash = 0; + // for(size_t i=0; i Date: Wed, 21 Dec 2022 11:31:01 -0800 Subject: [PATCH 04/16] [core] group by owner policy Signed-off-by: Clarence Ng --- .../worker_killing_policy_group_by_owner.cc | 56 ++++++++++++++++--- .../worker_killing_policy_group_by_owner.h | 37 ++++++------ 2 files changed, 67 insertions(+), 26 deletions(-) diff --git a/src/ray/raylet/worker_killing_policy_group_by_owner.cc b/src/ray/raylet/worker_killing_policy_group_by_owner.cc index 914f6a822e15..ff7a3bd8f772 100644 --- a/src/ray/raylet/worker_killing_policy_group_by_owner.cc +++ b/src/ray/raylet/worker_killing_policy_group_by_owner.cc @@ -79,17 +79,55 @@ GroupByOwnerIdWorkerKillingPolicy::SelectWorkerToKill( Group selected_group = sorted.front(); auto worker_to_kill = selected_group.SelectWorkerToKill(); - bool should_retry = selected_group.WorkerCount() > 1; + bool should_retry = selected_group.GetAllWorkers().size() > 1; - // const static int32_t max_to_print = 10; - // RAY_LOG(INFO) << "The top 10 workers to be killed based on the worker killing policy:\n" - // << WorkersDebugString(sorted, max_to_print, system_memory); - - // return std::make_pair(sorted.front(), /*should retry*/ true); + RAY_LOG(INFO) << "The top 10 groups to be killed based on the worker killing policy:\n" + << PolicyDebugString(sorted, system_memory); return std::make_pair(worker_to_kill, should_retry); } +std::string GroupByOwnerIdWorkerKillingPolicy::PolicyDebugString(const std::vector &groups, const MemorySnapshot &system_memory) { + std::stringstream result; + int32_t group_index = 0; + for (auto &group : groups) { + result << "Group (retriable: " << group.IsRetriable() << ") (owner id: " << group.OwnerId() << ") (time counter: " << group.GetAssignedTaskTime().time_since_epoch().count() << "):\n"; + + int64_t worker_index = 0; + for (auto &worker : group.GetAllWorkers()) { + auto pid = worker->GetProcess().GetId(); + int64_t used_memory = 0; + const auto pid_entry = system_memory.process_used_bytes.find(pid); + if (pid_entry != system_memory.process_used_bytes.end()) { + used_memory = pid_entry->second; + } else { + RAY_LOG_EVERY_MS(INFO, 60000) + << "Can't find memory usage for PID, reporting zero. PID: " << pid; + } + result << "Worker time counter " + << worker->GetAssignedTaskTime().time_since_epoch().count() << " worker id " + << worker->WorkerId() << " memory used " << used_memory << " task spec " + << worker->GetAssignedTask().GetTaskSpecification().DebugString() << "\n"; + + worker_index += 1; + if (worker_index > 10) { + break; + } + } + + group_index += 1; + if (group_index > 10) { + break; + } + } + + return result.str(); +} + +TaskID Group::OwnerId() const { + return owner_id_; +} + bool Group::IsRetriable() const { return retriable_; } @@ -107,7 +145,7 @@ void Group::AddToGroup(std::shared_ptr worker) { } else { RAY_CHECK_EQ(retriable_, worker->GetAssignedTask().GetTaskSpecification().IsRetriable()); } - workers_.insert(worker); + workers_.push_back(worker); } const std::shared_ptr Group::SelectWorkerToKill() const { @@ -132,8 +170,8 @@ const std::shared_ptr Group::SelectWorkerToKill() const { return sorted.front(); } -int32_t Group::WorkerCount() const { - return workers_.size(); +const std::vector> Group::GetAllWorkers() const{ + return workers_; } } // namespace raylet diff --git a/src/ray/raylet/worker_killing_policy_group_by_owner.h b/src/ray/raylet/worker_killing_policy_group_by_owner.h index 9158cb0183ac..335338feb74b 100644 --- a/src/ray/raylet/worker_killing_policy_group_by_owner.h +++ b/src/ray/raylet/worker_killing_policy_group_by_owner.h @@ -26,21 +26,6 @@ namespace ray { namespace raylet { -/// Groups worker by its owner id if it is a task. Each actor belongs to its own group. -/// The inter-group policy prioritizes killing groups that are retriable first, then in LIFO order, -/// where each group's priority is based on the time of its earliest submitted member. -/// The intra-group policy prioritizes killing in LIFO order. -/// -/// It will set the task to-be-killed to be non-retriable if it is the last member of the group. -/// Otherwise the task is set to be retriable. -class GroupByOwnerIdWorkerKillingPolicy : public WorkerKillingPolicy { - public: - GroupByOwnerIdWorkerKillingPolicy(); - const std::pair, bool> SelectWorkerToKill( - const std::vector> &workers, - const MemorySnapshot &system_memory) const; -}; - struct GroupKey { GroupKey(const TaskID& owner_id, bool retriable): owner_id(owner_id), retriable(retriable) {} const TaskID& owner_id; @@ -50,15 +35,16 @@ struct GroupKey { class Group { public: Group(const TaskID& owner_id, bool retriable): owner_id_(owner_id), retriable_(retriable) {} + TaskID OwnerId() const; bool IsRetriable() const; const std::chrono::steady_clock::time_point GetAssignedTaskTime() const; const std::shared_ptr SelectWorkerToKill() const; - int32_t WorkerCount() const; + const std::vector> GetAllWorkers() const; void AddToGroup(std::shared_ptr worker); private: /// Tasks belonging to this group. - absl::flat_hash_set> workers_; + std::vector> workers_; /// The earliest creation time of the tasks. std::chrono::steady_clock::time_point time_ = std::chrono::steady_clock::now(); @@ -85,6 +71,23 @@ bool group_key_equal_fn(const GroupKey& left, const GroupKey& right) { return left.owner_id == right.owner_id && left.retriable == right.retriable; } +/// Groups worker by its owner id if it is a task. Each actor belongs to its own group. +/// The inter-group policy prioritizes killing groups that are retriable first, then in LIFO order, +/// where each group's priority is based on the time of its earliest submitted member. +/// The intra-group policy prioritizes killing in LIFO order. +/// +/// It will set the task to-be-killed to be non-retriable if it is the last member of the group. +/// Otherwise the task is set to be retriable. +class GroupByOwnerIdWorkerKillingPolicy : public WorkerKillingPolicy { + public: + GroupByOwnerIdWorkerKillingPolicy(); + const std::pair, bool> SelectWorkerToKill( + const std::vector> &workers, + const MemorySnapshot &system_memory) const; + private: + static std::string PolicyDebugString(const std::vector &groups, const MemorySnapshot &system_memory); +}; + } // namespace raylet } // namespace ray From 68ca544b3810c55f1958714450b44158f2c08c0d Mon Sep 17 00:00:00 2001 From: Clarence Ng Date: Wed, 21 Dec 2022 18:36:25 -0800 Subject: [PATCH 05/16] [core] group by owner policy Signed-off-by: Clarence Ng --- src/ray/common/ray_config_def.h | 2 ++ src/ray/raylet/node_manager.cc | 4 ++-- src/ray/raylet/node_manager.h | 4 ++++ src/ray/raylet/worker_killing_policy.cc | 17 +++++++++++++++++ src/ray/raylet/worker_killing_policy.h | 8 +++++++- .../worker_killing_policy_group_by_owner.cc | 13 ++++++++++++- .../worker_killing_policy_group_by_owner.h | 13 ++----------- 7 files changed, 46 insertions(+), 15 deletions(-) diff --git a/src/ray/common/ray_config_def.h b/src/ray/common/ray_config_def.h index 6424ffcf32e3..3e9a8e6ea373 100644 --- a/src/ray/common/ray_config_def.h +++ b/src/ray/common/ray_config_def.h @@ -113,6 +113,8 @@ RAY_CONFIG(uint64_t, task_failure_entry_ttl_ms, 15 * 60 * 1000) /// supported. RAY_CONFIG(uint64_t, task_oom_retries, 15) +RAY_CONFIG(std::string, worker_killing_policy, "retriable_lifo") + /// If the raylet fails to get agent info, we will retry after this interval. RAY_CONFIG(uint64_t, raylet_get_agent_info_interval_ms, 1) diff --git a/src/ray/raylet/node_manager.cc b/src/ray/raylet/node_manager.cc index 559f98eb506b..1934533228eb 100644 --- a/src/ray/raylet/node_manager.cc +++ b/src/ray/raylet/node_manager.cc @@ -336,6 +336,7 @@ NodeManager::NodeManager(instrumented_io_context &io_service, next_resource_seq_no_(0), ray_syncer_(io_service_, self_node_id_.Binary()), ray_syncer_service_(ray_syncer_), + worker_killing_policy_(CreateWorkerKillingPolicy(RayConfig::instance().worker_killing_policy())), memory_monitor_(std::make_unique( io_service, RayConfig::instance().memory_usage_threshold(), @@ -2940,9 +2941,8 @@ MemoryUsageRefreshCallback NodeManager::CreateMemoryUsageRefreshCallback() { << "idle worker are occupying most of the memory."; return; } - RetriableLIFOWorkerKillingPolicy worker_killing_policy; auto worker_to_kill_and_should_retry = - worker_killing_policy.SelectWorkerToKill(workers, system_memory); + worker_killing_policy_->SelectWorkerToKill(workers, system_memory); auto worker_to_kill = worker_to_kill_and_should_retry.first; bool should_retry = worker_to_kill_and_should_retry.second; if (worker_to_kill == nullptr) { diff --git a/src/ray/raylet/node_manager.h b/src/ray/raylet/node_manager.h index 1af98957906a..8aa428ce432d 100644 --- a/src/ray/raylet/node_manager.h +++ b/src/ray/raylet/node_manager.h @@ -46,6 +46,7 @@ #include "ray/common/asio/instrumented_io_context.h" #include "ray/common/bundle_spec.h" #include "ray/raylet/placement_group_resource_manager.h" +#include "ray/raylet/worker_killing_policy.h" // clang-format on namespace ray { @@ -865,8 +866,11 @@ class NodeManager : public rpc::NodeManagerServiceHandler, /// RaySyncerService for gRPC syncer::RaySyncerService ray_syncer_service_; + std::shared_ptr worker_killing_policy_; + /// Monitors and reports node memory usage and whether it is above threshold. std::unique_ptr memory_monitor_; + }; } // namespace raylet diff --git a/src/ray/raylet/worker_killing_policy.cc b/src/ray/raylet/worker_killing_policy.cc index f5e738f0193d..2890a232314d 100644 --- a/src/ray/raylet/worker_killing_policy.cc +++ b/src/ray/raylet/worker_killing_policy.cc @@ -13,6 +13,7 @@ // limitations under the License. #include "ray/raylet/worker_killing_policy.h" +#include "ray/raylet/worker_killing_policy_group_by_owner.h" #include @@ -88,6 +89,22 @@ std::string WorkerKillingPolicy::WorkersDebugString( return result.str(); } +std::shared_ptr CreateWorkerKillingPolicy( + std::string killing_policy_str) { + if (killing_policy_str == kLifoPolicy) { + RAY_LOG(INFO) << "Running RetriableLIFO policy."; + return std::make_shared(); + } else if (killing_policy_str == kGroupByOwner) { + RAY_LOG(INFO) << "Running GroupByOwner policy."; + return std::make_shared(); + } else { + RAY_LOG(ERROR) + << killing_policy_str + << " is an invalid killing policy. Defaulting to RetriableLIFO policy."; + return std::make_shared(); + } +} + } // namespace raylet } // namespace ray diff --git a/src/ray/raylet/worker_killing_policy.h b/src/ray/raylet/worker_killing_policy.h index d7ac5f43e484..d8e1ef742da9 100644 --- a/src/ray/raylet/worker_killing_policy.h +++ b/src/ray/raylet/worker_killing_policy.h @@ -26,6 +26,9 @@ namespace ray { namespace raylet { +constexpr char kLifoPolicy[] = "retriable_lifo"; +constexpr char kGroupByOwner[] = "group_by_owner"; + /// Provides the policy on which worker to prioritize killing. class WorkerKillingPolicy { public: @@ -34,7 +37,7 @@ class WorkerKillingPolicy { /// \param workers the list of candidate workers. /// \param system_memory snapshot of memory usage. /// - /// \return the worker to kill and whethewr the task on the worker should be retried. + /// \return the worker to kill and whether the task on the worker should be retried. virtual const std::pair, bool> SelectWorkerToKill( const std::vector> &workers, const MemorySnapshot &system_memory) const = 0; @@ -65,6 +68,9 @@ class RetriableLIFOWorkerKillingPolicy : public WorkerKillingPolicy { const MemorySnapshot &system_memory) const; }; +std::shared_ptr CreateWorkerKillingPolicy( + std::string killing_policy_str); + } // namespace raylet } // namespace ray diff --git a/src/ray/raylet/worker_killing_policy_group_by_owner.cc b/src/ray/raylet/worker_killing_policy_group_by_owner.cc index ff7a3bd8f772..42011d5bbb87 100644 --- a/src/ray/raylet/worker_killing_policy_group_by_owner.cc +++ b/src/ray/raylet/worker_killing_policy_group_by_owner.cc @@ -40,7 +40,7 @@ GroupByOwnerIdWorkerKillingPolicy::SelectWorkerToKill( return std::make_pair(nullptr, /*should retry*/ false); } - GroupMap group_map(10, group_key_hashing_func, group_key_equal_fn); + GroupMap group_map(10, GroupKeyHash, GroupKeyEquals); for (auto worker : workers) { TaskID owner_id = worker->GetAssignedTask().GetTaskSpecification().ParentTaskId(); @@ -174,6 +174,17 @@ const std::vector> Group::GetAllWorkers() const return workers_; } +unsigned long GroupByOwnerIdWorkerKillingPolicy::GroupKeyHash(const GroupKey &key) { + unsigned long hash = 0; + boost::hash_combine(hash, key.owner_id.Hex()); + boost::hash_combine(hash, key.retriable); + return hash; +} + +bool GroupByOwnerIdWorkerKillingPolicy::GroupKeyEquals(const GroupKey &left, const GroupKey &right) { + return left.owner_id == right.owner_id && left.retriable == right.retriable; +} + } // namespace raylet } // namespace ray diff --git a/src/ray/raylet/worker_killing_policy_group_by_owner.h b/src/ray/raylet/worker_killing_policy_group_by_owner.h index 335338feb74b..408d2f6c3811 100644 --- a/src/ray/raylet/worker_killing_policy_group_by_owner.h +++ b/src/ray/raylet/worker_killing_policy_group_by_owner.h @@ -60,17 +60,6 @@ typedef std::unordered_map, std::function> GroupMap; -unsigned long group_key_hashing_func(const GroupKey& key) { - unsigned long hash = 0; - // for(size_t i=0; i &groups, const MemorySnapshot &system_memory); + static unsigned long GroupKeyHash(const GroupKey& key); + static bool GroupKeyEquals(const GroupKey& left, const GroupKey& right); }; } // namespace raylet From fd959af7f8e58b22282649cc3ef453b0b1c88bb1 Mon Sep 17 00:00:00 2001 From: Clarence Ng Date: Tue, 10 Jan 2023 09:43:07 -0800 Subject: [PATCH 06/16] [core] oom killer policy: group by owner id Signed-off-by: Clarence Ng --- BUILD.bazel | 16 ++- src/ray/raylet/node_manager.cc | 3 +- src/ray/raylet/node_manager.h | 1 - src/ray/raylet/worker_killing_policy.cc | 2 +- .../worker_killing_policy_group_by_owner.cc | 89 ++++++++------- .../worker_killing_policy_group_by_owner.h | 41 ++++--- ...rker_killing_policy_group_by_owner_test.cc | 107 ++++++++++++++++++ src/ray/raylet/worker_killing_policy_test.cc | 2 - 8 files changed, 194 insertions(+), 67 deletions(-) create mode 100644 src/ray/raylet/worker_killing_policy_group_by_owner_test.cc diff --git a/BUILD.bazel b/BUILD.bazel index f6595702b243..01417244034d 100644 --- a/BUILD.bazel +++ b/BUILD.bazel @@ -1651,9 +1651,21 @@ cc_test( ], copts = COPTS, tags = ["team:core"], - target_compatible_with = [ - "@platforms//os:linux", + deps = [ + ":ray_common", + ":raylet_lib", + "@com_google_googletest//:gtest_main", + ], +) + +cc_test( + name = "worker_killing_policy_group_by_owner_test", + size = "small", + srcs = [ + "src/ray/raylet/worker_killing_policy_group_by_owner_test.cc", ], + copts = COPTS, + tags = ["team:core"], deps = [ ":ray_common", ":raylet_lib", diff --git a/src/ray/raylet/node_manager.cc b/src/ray/raylet/node_manager.cc index 9e18ae697cc7..1d7cb01b6951 100644 --- a/src/ray/raylet/node_manager.cc +++ b/src/ray/raylet/node_manager.cc @@ -283,7 +283,8 @@ NodeManager::NodeManager(instrumented_io_context &io_service, next_resource_seq_no_(0), ray_syncer_(io_service_, self_node_id_.Binary()), ray_syncer_service_(ray_syncer_), - worker_killing_policy_(CreateWorkerKillingPolicy(RayConfig::instance().worker_killing_policy())), + worker_killing_policy_( + CreateWorkerKillingPolicy(RayConfig::instance().worker_killing_policy())), memory_monitor_(std::make_unique( io_service, RayConfig::instance().memory_usage_threshold(), diff --git a/src/ray/raylet/node_manager.h b/src/ray/raylet/node_manager.h index 1df4a067043a..fde8aa7045c3 100644 --- a/src/ray/raylet/node_manager.h +++ b/src/ray/raylet/node_manager.h @@ -838,7 +838,6 @@ class NodeManager : public rpc::NodeManagerServiceHandler, /// Monitors and reports node memory usage and whether it is above threshold. std::unique_ptr memory_monitor_; - }; } // namespace raylet diff --git a/src/ray/raylet/worker_killing_policy.cc b/src/ray/raylet/worker_killing_policy.cc index 2890a232314d..8996d2e243aa 100644 --- a/src/ray/raylet/worker_killing_policy.cc +++ b/src/ray/raylet/worker_killing_policy.cc @@ -13,13 +13,13 @@ // limitations under the License. #include "ray/raylet/worker_killing_policy.h" -#include "ray/raylet/worker_killing_policy_group_by_owner.h" #include #include "ray/common/asio/instrumented_io_context.h" #include "ray/common/asio/periodical_runner.h" #include "ray/raylet/worker.h" +#include "ray/raylet/worker_killing_policy_group_by_owner.h" #include "ray/raylet/worker_pool.h" namespace ray { diff --git a/src/ray/raylet/worker_killing_policy_group_by_owner.cc b/src/ray/raylet/worker_killing_policy_group_by_owner.cc index 42011d5bbb87..d21969701adc 100644 --- a/src/ray/raylet/worker_killing_policy_group_by_owner.cc +++ b/src/ray/raylet/worker_killing_policy_group_by_owner.cc @@ -12,18 +12,19 @@ // See the License for the specific language governing permissions and // limitations under the License. -#include "ray/raylet/worker_killing_policy.h" +#include "ray/raylet/worker_killing_policy_group_by_owner.h" #include +#include +#include + +#include "absl/container/flat_hash_map.h" #include "ray/common/asio/instrumented_io_context.h" #include "ray/common/asio/periodical_runner.h" #include "ray/raylet/worker.h" +#include "ray/raylet/worker_killing_policy.h" #include "ray/raylet/worker_pool.h" -#include "ray/raylet/worker_killing_policy_group_by_owner.h" -#include "absl/container/flat_hash_map.h" -#include -#include namespace ray { @@ -45,39 +46,42 @@ GroupByOwnerIdWorkerKillingPolicy::SelectWorkerToKill( for (auto worker : workers) { TaskID owner_id = worker->GetAssignedTask().GetTaskSpecification().ParentTaskId(); bool retriable = worker->GetAssignedTask().GetTaskSpecification().IsRetriable(); - + GroupKey group_key(owner_id, retriable); auto it = group_map.find(group_key); if (it == group_map.end()) { - Group group(owner_id, retriable); - group_map.insert({group_key, group}); - group.AddToGroup(worker); + Group group(owner_id, retriable); + group_map.insert({group_key, group}); + group.AddToGroup(worker); } else { - auto group = it->second; - group.AddToGroup(worker); + auto group = it->second; + group.AddToGroup(worker); } } - + std::vector sorted; - for(auto it = group_map.begin(); it != group_map.end(); ++it) { + for (auto it = group_map.begin(); it != group_map.end(); ++it) { sorted.push_back(it->second); } - std::sort(sorted.begin(), - sorted.end(), - [](Group const &left, - Group const &right) -> bool { - int left_retriable = left.IsRetriable() ? 0 : 1; - int right_retriable = right.IsRetriable() ? 0 : 1; - if (left_retriable == right_retriable) { - return left.GetAssignedTaskTime() > right.GetAssignedTaskTime(); - } - return left_retriable < right_retriable; - return 0; - }); - + std::sort( + sorted.begin(), sorted.end(), [](Group const &left, Group const &right) -> bool { + int left_retriable = left.IsRetriable() ? 0 : 1; + int right_retriable = right.IsRetriable() ? 0 : 1; + if (left_retriable == right_retriable) { + return left.GetAssignedTaskTime() > right.GetAssignedTaskTime(); + } + return left_retriable < right_retriable; + }); + Group selected_group = sorted.front(); + for (Group group : sorted) { + if (group.GetAllWorkers().size() > 1) { + selected_group = group; + break; + } + } auto worker_to_kill = selected_group.SelectWorkerToKill(); bool should_retry = selected_group.GetAllWorkers().size() > 1; @@ -87,11 +91,14 @@ GroupByOwnerIdWorkerKillingPolicy::SelectWorkerToKill( return std::make_pair(worker_to_kill, should_retry); } -std::string GroupByOwnerIdWorkerKillingPolicy::PolicyDebugString(const std::vector &groups, const MemorySnapshot &system_memory) { +std::string GroupByOwnerIdWorkerKillingPolicy::PolicyDebugString( + const std::vector &groups, const MemorySnapshot &system_memory) { std::stringstream result; int32_t group_index = 0; for (auto &group : groups) { - result << "Group (retriable: " << group.IsRetriable() << ") (owner id: " << group.OwnerId() << ") (time counter: " << group.GetAssignedTaskTime().time_since_epoch().count() << "):\n"; + result << "Group (retriable: " << group.IsRetriable() + << ") (owner id: " << group.OwnerId() << ") (time counter: " + << group.GetAssignedTaskTime().time_since_epoch().count() << "):\n"; int64_t worker_index = 0; for (auto &worker : group.GetAllWorkers()) { @@ -105,9 +112,9 @@ std::string GroupByOwnerIdWorkerKillingPolicy::PolicyDebugString(const std::vect << "Can't find memory usage for PID, reporting zero. PID: " << pid; } result << "Worker time counter " - << worker->GetAssignedTaskTime().time_since_epoch().count() << " worker id " - << worker->WorkerId() << " memory used " << used_memory << " task spec " - << worker->GetAssignedTask().GetTaskSpecification().DebugString() << "\n"; + << worker->GetAssignedTaskTime().time_since_epoch().count() << " worker id " + << worker->WorkerId() << " memory used " << used_memory << " task spec " + << worker->GetAssignedTask().GetTaskSpecification().DebugString() << "\n"; worker_index += 1; if (worker_index > 10) { @@ -124,13 +131,9 @@ std::string GroupByOwnerIdWorkerKillingPolicy::PolicyDebugString(const std::vect return result.str(); } -TaskID Group::OwnerId() const { - return owner_id_; -} +TaskID Group::OwnerId() const { return owner_id_; } -bool Group::IsRetriable() const { - return retriable_; -} +bool Group::IsRetriable() const { return retriable_; } const std::chrono::steady_clock::time_point Group::GetAssignedTaskTime() const { return time_; @@ -140,17 +143,18 @@ void Group::AddToGroup(std::shared_ptr worker) { if (worker->GetAssignedTaskTime() < time_) { time_ = worker->GetAssignedTaskTime(); } + bool retriable = worker->GetAssignedTask().GetTaskSpecification().IsRetriable(); if (workers_.empty()) { - retriable_ = worker->GetAssignedTask().GetTaskSpecification().IsRetriable(); + retriable_ = retriable; } else { - RAY_CHECK_EQ(retriable_, worker->GetAssignedTask().GetTaskSpecification().IsRetriable()); + RAY_CHECK_EQ(retriable_, retriable); } workers_.push_back(worker); } const std::shared_ptr Group::SelectWorkerToKill() const { RAY_CHECK(!workers_.empty()); - + std::vector> sorted(workers_.begin(), workers_.end()); std::sort(sorted.begin(), @@ -170,7 +174,7 @@ const std::shared_ptr Group::SelectWorkerToKill() const { return sorted.front(); } -const std::vector> Group::GetAllWorkers() const{ +const std::vector> Group::GetAllWorkers() const { return workers_; } @@ -181,7 +185,8 @@ unsigned long GroupByOwnerIdWorkerKillingPolicy::GroupKeyHash(const GroupKey &ke return hash; } -bool GroupByOwnerIdWorkerKillingPolicy::GroupKeyEquals(const GroupKey &left, const GroupKey &right) { +bool GroupByOwnerIdWorkerKillingPolicy::GroupKeyEquals(const GroupKey &left, + const GroupKey &right) { return left.owner_id == right.owner_id && left.retriable == right.retriable; } diff --git a/src/ray/raylet/worker_killing_policy_group_by_owner.h b/src/ray/raylet/worker_killing_policy_group_by_owner.h index 408d2f6c3811..7542bb12541b 100644 --- a/src/ray/raylet/worker_killing_policy_group_by_owner.h +++ b/src/ray/raylet/worker_killing_policy_group_by_owner.h @@ -16,32 +16,33 @@ #include +#include "absl/container/flat_hash_set.h" #include "ray/common/memory_monitor.h" #include "ray/raylet/worker.h" #include "ray/raylet/worker_killing_policy.h" -#include "absl/container/flat_hash_set.h" - namespace ray { namespace raylet { struct GroupKey { - GroupKey(const TaskID& owner_id, bool retriable): owner_id(owner_id), retriable(retriable) {} - const TaskID& owner_id; + GroupKey(const TaskID &owner_id, bool retriable) + : owner_id(owner_id), retriable(retriable) {} + const TaskID &owner_id; bool retriable; }; class Group { public: - Group(const TaskID& owner_id, bool retriable): owner_id_(owner_id), retriable_(retriable) {} + Group(const TaskID &owner_id, bool retriable) + : owner_id_(owner_id), retriable_(retriable) {} TaskID OwnerId() const; bool IsRetriable() const; const std::chrono::steady_clock::time_point GetAssignedTaskTime() const; const std::shared_ptr SelectWorkerToKill() const; const std::vector> GetAllWorkers() const; void AddToGroup(std::shared_ptr worker); - + private: /// Tasks belonging to this group. std::vector> workers_; @@ -56,27 +57,31 @@ class Group { bool retriable_; }; -typedef std::unordered_map, - std::function> GroupMap; +typedef std::unordered_map, + std::function> + GroupMap; /// Groups worker by its owner id if it is a task. Each actor belongs to its own group. -/// The inter-group policy prioritizes killing groups that are retriable first, then in LIFO order, -/// where each group's priority is based on the time of its earliest submitted member. -/// The intra-group policy prioritizes killing in LIFO order. -/// -/// It will set the task to-be-killed to be non-retriable if it is the last member of the group. -/// Otherwise the task is set to be retriable. +/// The inter-group policy prioritizes killing groups that are retriable first, then in +/// LIFO order, where each group's priority is based on the time of its earliest submitted +/// member. The intra-group policy prioritizes killing in LIFO order. +/// +/// It will set the task to-be-killed to be non-retriable if it is the last member of the +/// group. Otherwise the task is set to be retriable. class GroupByOwnerIdWorkerKillingPolicy : public WorkerKillingPolicy { public: GroupByOwnerIdWorkerKillingPolicy(); const std::pair, bool> SelectWorkerToKill( const std::vector> &workers, const MemorySnapshot &system_memory) const; + private: - static std::string PolicyDebugString(const std::vector &groups, const MemorySnapshot &system_memory); - static unsigned long GroupKeyHash(const GroupKey& key); - static bool GroupKeyEquals(const GroupKey& left, const GroupKey& right); + static std::string PolicyDebugString(const std::vector &groups, + const MemorySnapshot &system_memory); + static unsigned long GroupKeyHash(const GroupKey &key); + static bool GroupKeyEquals(const GroupKey &left, const GroupKey &right); }; } // namespace raylet diff --git a/src/ray/raylet/worker_killing_policy_group_by_owner_test.cc b/src/ray/raylet/worker_killing_policy_group_by_owner_test.cc new file mode 100644 index 000000000000..319dc8b1fcff --- /dev/null +++ b/src/ray/raylet/worker_killing_policy_group_by_owner_test.cc @@ -0,0 +1,107 @@ +// Copyright 2022 The Ray Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "ray/raylet/worker_killing_policy.h" +#include "ray/raylet/worker_killing_policy_group_by_owner.h" + +#include "gtest/gtest.h" +#include "ray/common/task/task_spec.h" +#include "ray/raylet/test/util.h" + +namespace ray { + +namespace raylet { + +class WorkerKillingGroupByOwnerTest : public ::testing::Test { + protected: + instrumented_io_context io_context_; + int32_t port_ = 2389; + GroupByOwnerIdWorkerKillingPolicy worker_killing_policy_; + + std::shared_ptr CreateActorCreationWorker(int32_t max_restarts) { + rpc::TaskSpec message; + message.mutable_actor_creation_task_spec()->set_max_actor_restarts(max_restarts); + message.set_type(ray::rpc::TaskType::ACTOR_CREATION_TASK); + TaskSpecification task_spec(message); + RayTask task(task_spec); + auto worker = std::make_shared(ray::WorkerID::FromRandom(), port_); + worker->SetAssignedTask(task); + return worker; + } + + std::shared_ptr CreateTaskWorker(int32_t max_retries) { + rpc::TaskSpec message; + message.set_max_retries(max_retries); + message.set_type(ray::rpc::TaskType::NORMAL_TASK); + TaskSpecification task_spec(message); + RayTask task(task_spec); + auto worker = std::make_shared(ray::WorkerID::FromRandom(), port_); + worker->SetAssignedTask(task); + return worker; + } +}; + +TEST_F(WorkerKillingGroupByOwnerTest, TestEmptyWorkerPoolSelectsNullWorker) { + std::vector> workers; + auto worker_to_kill_and_should_retry = + worker_killing_policy_.SelectWorkerToKill(workers, MemorySnapshot()); + auto worker_to_kill = worker_to_kill_and_should_retry.first; + ASSERT_TRUE(worker_to_kill == nullptr); +} + +TEST_F(WorkerKillingGroupByOwnerTest, + TestPreferRetriableOverNonRetriableAndOrderByTimestampDescending) { + // std::vector> workers; + // auto first_submitted = WorkerKillerTest::CreateActorTaskWorker(7 /* max_restarts */); + // auto second_submitted = + // WorkerKillerTest::CreateActorCreationWorker(5 /* max_restarts */); + // auto third_submitted = WorkerKillerTest::CreateTaskWorker(0 /* max_restarts */); + // auto fourth_submitted = WorkerKillerTest::CreateTaskWorker(11 /* max_restarts */); + // auto fifth_submitted = + // WorkerKillerTest::CreateActorCreationWorker(0 /* max_restarts */); + // auto sixth_submitted = WorkerKillerTest::CreateActorTaskWorker(0 /* max_restarts */); + + // workers.push_back(first_submitted); + // workers.push_back(second_submitted); + // workers.push_back(third_submitted); + // workers.push_back(fourth_submitted); + // workers.push_back(fifth_submitted); + // workers.push_back(sixth_submitted); + + // std::vector> expected_order; + // expected_order.push_back(fourth_submitted); + // expected_order.push_back(second_submitted); + // expected_order.push_back(sixth_submitted); + // expected_order.push_back(fifth_submitted); + // expected_order.push_back(third_submitted); + // expected_order.push_back(first_submitted); + + // for (const auto &expected : expected_order) { + // auto worker_to_kill_and_should_retry = + // worker_killing_policy_.SelectWorkerToKill(workers, MemorySnapshot()); + // auto worker_to_kill = worker_to_kill_and_should_retry.first; + // ASSERT_EQ(worker_to_kill->WorkerId(), expected->WorkerId()); + // workers.erase(std::remove(workers.begin(), workers.end(), worker_to_kill), + // workers.end()); + // } +} + +} // namespace raylet + +} // namespace ray + +int main(int argc, char **argv) { + ::testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} diff --git a/src/ray/raylet/worker_killing_policy_test.cc b/src/ray/raylet/worker_killing_policy_test.cc index ce2edbb0edd4..d5588491f6eb 100644 --- a/src/ray/raylet/worker_killing_policy_test.cc +++ b/src/ray/raylet/worker_killing_policy_test.cc @@ -14,8 +14,6 @@ #include "ray/raylet/worker_killing_policy.h" -#include - #include "gtest/gtest.h" #include "ray/common/task/task_spec.h" #include "ray/raylet/test/util.h" From db1305bcce6b4030936dfa70efb086e0c77e07ce Mon Sep 17 00:00:00 2001 From: Clarence Ng Date: Tue, 10 Jan 2023 17:11:59 -0800 Subject: [PATCH 07/16] [core] oom killer policy: group by owner id Signed-off-by: Clarence Ng --- src/ray/common/task/task_spec.cc | 3 +- src/ray/raylet/test/util.h | 6 +- .../worker_killing_policy_group_by_owner.cc | 20 +- .../worker_killing_policy_group_by_owner.h | 6 +- ...rker_killing_policy_group_by_owner_test.cc | 189 ++++++++++++++---- 5 files changed, 172 insertions(+), 52 deletions(-) diff --git a/src/ray/common/task/task_spec.cc b/src/ray/common/task/task_spec.cc index 44c7213bd7fe..363b66ca7df1 100644 --- a/src/ray/common/task/task_spec.cc +++ b/src/ray/common/task/task_spec.cc @@ -361,7 +361,7 @@ ActorID TaskSpecification::ActorCreationId() const { } int64_t TaskSpecification::MaxActorRestarts() const { - RAY_CHECK(IsActorCreationTask()); + // RAY_CHECK(IsActorCreationTask()); return message_->actor_creation_task_spec().max_actor_restarts(); } @@ -521,6 +521,7 @@ std::string TaskSpecification::RuntimeEnvDebugString() const { } bool TaskSpecification::IsRetriable() const { + RAY_LOG(ERROR) << IsActorTask() << IsActorCreationTask() << IsNormalTask() << MaxActorRestarts() << MaxRetries(); if (IsActorTask()) { return false; } diff --git a/src/ray/raylet/test/util.h b/src/ray/raylet/test/util.h index 55515fbe6aef..cc685c601098 100644 --- a/src/ray/raylet/test/util.h +++ b/src/ray/raylet/test/util.h @@ -34,7 +34,7 @@ class MockWorker : public WorkerInterface { void SetOwnerAddress(const rpc::Address &address) override { address_ = address; } - void AssignTaskId(const TaskID &task_id) override {} + void AssignTaskId(const TaskID &task_id) override { task_id_ = task_id; } void SetAssignedTask(const RayTask &assigned_task) override { task_ = assigned_task; @@ -96,8 +96,7 @@ class MockWorker : public WorkerInterface { } void SetAssignedPort(int port) override { RAY_CHECK(false) << "Method unused"; } const TaskID &GetAssignedTaskId() const override { - RAY_CHECK(false) << "Method unused"; - return TaskID::Nil(); + return task_id_; } bool AddBlockedTaskId(const TaskID &task_id) override { RAY_CHECK(false) << "Method unused"; @@ -191,6 +190,7 @@ class MockWorker : public WorkerInterface { RayTask task_; std::chrono::steady_clock::time_point task_assign_time_; int runtime_env_hash_; + TaskID task_id_; }; } // namespace raylet diff --git a/src/ray/raylet/worker_killing_policy_group_by_owner.cc b/src/ray/raylet/worker_killing_policy_group_by_owner.cc index d21969701adc..1e3de3924a15 100644 --- a/src/ray/raylet/worker_killing_policy_group_by_owner.cc +++ b/src/ray/raylet/worker_killing_policy_group_by_owner.cc @@ -46,17 +46,20 @@ GroupByOwnerIdWorkerKillingPolicy::SelectWorkerToKill( for (auto worker : workers) { TaskID owner_id = worker->GetAssignedTask().GetTaskSpecification().ParentTaskId(); bool retriable = worker->GetAssignedTask().GetTaskSpecification().IsRetriable(); + TaskID non_retriable_task_id = retriable ? TaskID::FromHex("Retriable") : worker->GetAssignedTaskId(); - GroupKey group_key(owner_id, retriable); + GroupKey group_key(owner_id, non_retriable_task_id); auto it = group_map.find(group_key); if (it == group_map.end()) { Group group(owner_id, retriable); - group_map.insert({group_key, group}); group.AddToGroup(worker); + group_map.emplace(group_key, std::move(group)); + RAY_LOG(ERROR) << group.OwnerId() << "ne group size " << group.GetAllWorkers().size() << " " << retriable << " " << group_key.non_retriable_task_id; } else { - auto group = it->second; + auto &group = it->second; group.AddToGroup(worker); + RAY_LOG(ERROR) << group.OwnerId() << "update group size " << group.GetAllWorkers().size() << " " << retriable << " " << group_key.non_retriable_task_id; } } @@ -75,6 +78,9 @@ GroupByOwnerIdWorkerKillingPolicy::SelectWorkerToKill( return left_retriable < right_retriable; }); + RAY_LOG(ERROR) << "groups sorted:\n" + << PolicyDebugString(sorted, system_memory); + Group selected_group = sorted.front(); for (Group group : sorted) { if (group.GetAllWorkers().size() > 1) { @@ -85,9 +91,6 @@ GroupByOwnerIdWorkerKillingPolicy::SelectWorkerToKill( auto worker_to_kill = selected_group.SelectWorkerToKill(); bool should_retry = selected_group.GetAllWorkers().size() > 1; - RAY_LOG(INFO) << "The top 10 groups to be killed based on the worker killing policy:\n" - << PolicyDebugString(sorted, system_memory); - return std::make_pair(worker_to_kill, should_retry); } @@ -154,7 +157,6 @@ void Group::AddToGroup(std::shared_ptr worker) { const std::shared_ptr Group::SelectWorkerToKill() const { RAY_CHECK(!workers_.empty()); - std::vector> sorted(workers_.begin(), workers_.end()); std::sort(sorted.begin(), @@ -181,13 +183,13 @@ const std::vector> Group::GetAllWorkers() const unsigned long GroupByOwnerIdWorkerKillingPolicy::GroupKeyHash(const GroupKey &key) { unsigned long hash = 0; boost::hash_combine(hash, key.owner_id.Hex()); - boost::hash_combine(hash, key.retriable); + boost::hash_combine(hash, key.non_retriable_task_id.Hex()); return hash; } bool GroupByOwnerIdWorkerKillingPolicy::GroupKeyEquals(const GroupKey &left, const GroupKey &right) { - return left.owner_id == right.owner_id && left.retriable == right.retriable; + return left.owner_id == right.owner_id && left.non_retriable_task_id == right.non_retriable_task_id; } } // namespace raylet diff --git a/src/ray/raylet/worker_killing_policy_group_by_owner.h b/src/ray/raylet/worker_killing_policy_group_by_owner.h index 7542bb12541b..9d9372b1d4e8 100644 --- a/src/ray/raylet/worker_killing_policy_group_by_owner.h +++ b/src/ray/raylet/worker_killing_policy_group_by_owner.h @@ -26,10 +26,10 @@ namespace ray { namespace raylet { struct GroupKey { - GroupKey(const TaskID &owner_id, bool retriable) - : owner_id(owner_id), retriable(retriable) {} + GroupKey(const TaskID &owner_id, const TaskID &non_retriable_task_id) + : owner_id(owner_id), non_retriable_task_id(non_retriable_task_id) {} const TaskID &owner_id; - bool retriable; + const TaskID &non_retriable_task_id; }; class Group { diff --git a/src/ray/raylet/worker_killing_policy_group_by_owner_test.cc b/src/ray/raylet/worker_killing_policy_group_by_owner_test.cc index 319dc8b1fcff..833e60efa5e7 100644 --- a/src/ray/raylet/worker_killing_policy_group_by_owner_test.cc +++ b/src/ray/raylet/worker_killing_policy_group_by_owner_test.cc @@ -27,27 +27,38 @@ class WorkerKillingGroupByOwnerTest : public ::testing::Test { protected: instrumented_io_context io_context_; int32_t port_ = 2389; + JobID job_id_ = JobID::FromInt(75); + bool should_retry = true; + bool should_not_retry = false; + int32_t no_retry = 0; + int32_t has_retry = 1; GroupByOwnerIdWorkerKillingPolicy worker_killing_policy_; - std::shared_ptr CreateActorCreationWorker(int32_t max_restarts) { + std::shared_ptr CreateActorCreationWorker(TaskID owner_id, int32_t max_restarts) { rpc::TaskSpec message; + message.set_task_id(TaskID::FromRandom(job_id_).Binary()); + message.set_parent_task_id(owner_id.Binary()); message.mutable_actor_creation_task_spec()->set_max_actor_restarts(max_restarts); message.set_type(ray::rpc::TaskType::ACTOR_CREATION_TASK); TaskSpecification task_spec(message); RayTask task(task_spec); auto worker = std::make_shared(ray::WorkerID::FromRandom(), port_); worker->SetAssignedTask(task); + worker->AssignTaskId(task.GetTaskSpecification().TaskId()); return worker; } - std::shared_ptr CreateTaskWorker(int32_t max_retries) { + std::shared_ptr CreateTaskWorker(TaskID owner_id, int32_t max_retries) { rpc::TaskSpec message; + message.set_task_id(TaskID::FromRandom(job_id_).Binary()); + message.set_parent_task_id(owner_id.Binary()); message.set_max_retries(max_retries); message.set_type(ray::rpc::TaskType::NORMAL_TASK); TaskSpecification task_spec(message); RayTask task(task_spec); auto worker = std::make_shared(ray::WorkerID::FromRandom(), port_); worker->SetAssignedTask(task); + worker->AssignTaskId(task.GetTaskSpecification().TaskId()); return worker; } }; @@ -61,42 +72,148 @@ TEST_F(WorkerKillingGroupByOwnerTest, TestEmptyWorkerPoolSelectsNullWorker) { } TEST_F(WorkerKillingGroupByOwnerTest, - TestPreferRetriableOverNonRetriableAndOrderByTimestampDescending) { - // std::vector> workers; - // auto first_submitted = WorkerKillerTest::CreateActorTaskWorker(7 /* max_restarts */); - // auto second_submitted = - // WorkerKillerTest::CreateActorCreationWorker(5 /* max_restarts */); - // auto third_submitted = WorkerKillerTest::CreateTaskWorker(0 /* max_restarts */); - // auto fourth_submitted = WorkerKillerTest::CreateTaskWorker(11 /* max_restarts */); - // auto fifth_submitted = - // WorkerKillerTest::CreateActorCreationWorker(0 /* max_restarts */); - // auto sixth_submitted = WorkerKillerTest::CreateActorTaskWorker(0 /* max_restarts */); - - // workers.push_back(first_submitted); - // workers.push_back(second_submitted); - // workers.push_back(third_submitted); - // workers.push_back(fourth_submitted); - // workers.push_back(fifth_submitted); - // workers.push_back(sixth_submitted); - - // std::vector> expected_order; - // expected_order.push_back(fourth_submitted); - // expected_order.push_back(second_submitted); - // expected_order.push_back(sixth_submitted); - // expected_order.push_back(fifth_submitted); - // expected_order.push_back(third_submitted); - // expected_order.push_back(first_submitted); - - // for (const auto &expected : expected_order) { - // auto worker_to_kill_and_should_retry = - // worker_killing_policy_.SelectWorkerToKill(workers, MemorySnapshot()); - // auto worker_to_kill = worker_to_kill_and_should_retry.first; - // ASSERT_EQ(worker_to_kill->WorkerId(), expected->WorkerId()); - // workers.erase(std::remove(workers.begin(), workers.end(), worker_to_kill), - // workers.end()); - // } + TestLastWorkerInGroupShouldNotRetry) { + std::vector> workers; + + auto owner_id = TaskID::ForDriverTask(job_id_); + auto first_submitted = WorkerKillingGroupByOwnerTest::CreateActorCreationWorker(owner_id, has_retry); + auto second_submitted = + WorkerKillingGroupByOwnerTest::CreateTaskWorker(owner_id, has_retry); + + workers.push_back(first_submitted); + workers.push_back(second_submitted); + + std::vector, bool>> expected; + expected.push_back(std::make_pair(second_submitted, should_retry)); + expected.push_back(std::make_pair(first_submitted, should_not_retry)); + + for (const auto &entry : expected) { + auto worker_to_kill_and_should_retry = + worker_killing_policy_.SelectWorkerToKill(workers, MemorySnapshot()); + auto worker_to_kill = worker_to_kill_and_should_retry.first; + bool retry = worker_to_kill_and_should_retry.second; + ASSERT_EQ(worker_to_kill->WorkerId(), entry.first->WorkerId()); + ASSERT_EQ(retry, entry.second); + workers.erase(std::remove(workers.begin(), workers.end(), worker_to_kill), + workers.end()); + } +} + +TEST_F(WorkerKillingGroupByOwnerTest, + TestNonRetriableBelongsToItsOwnGroup) { + auto owner_id = TaskID::ForDriverTask(job_id_); + + std::vector> workers; + auto first_submitted = WorkerKillingGroupByOwnerTest::CreateActorCreationWorker(owner_id, no_retry); + auto second_submitted = WorkerKillingGroupByOwnerTest::CreateTaskWorker(owner_id, no_retry); + workers.push_back(first_submitted); + workers.push_back(second_submitted); + + std::vector, bool>> expected; + expected.push_back(std::make_pair(second_submitted, should_not_retry)); + + auto worker_to_kill_and_should_retry = worker_killing_policy_.SelectWorkerToKill(workers, MemorySnapshot()); + + auto worker_to_kill = worker_to_kill_and_should_retry.first; + bool retry = worker_to_kill_and_should_retry.second; + ASSERT_EQ(worker_to_kill->WorkerId(), second_submitted->WorkerId()); + ASSERT_EQ(retry, should_not_retry); +} + +TEST_F(WorkerKillingGroupByOwnerTest, + TestGroupSortedByFirstSubmittedTaskAndSkipKillGroupWithOneTaskLeft) { + auto first_group_owner_id = TaskID::FromRandom(job_id_); + auto second_group_owner_id = TaskID::FromRandom(job_id_); + + std::vector> workers; + auto first_submitted = WorkerKillingGroupByOwnerTest::CreateActorCreationWorker(first_group_owner_id, has_retry); + auto second_submitted = WorkerKillingGroupByOwnerTest::CreateTaskWorker(second_group_owner_id, has_retry); + auto third_submitted = WorkerKillingGroupByOwnerTest::CreateActorCreationWorker(second_group_owner_id, has_retry); + auto fourth_submitted = WorkerKillingGroupByOwnerTest::CreateActorCreationWorker(second_group_owner_id, has_retry); + auto fifth_submitted = WorkerKillingGroupByOwnerTest::CreateTaskWorker(first_group_owner_id, has_retry); + auto sixth_submitted = WorkerKillingGroupByOwnerTest::CreateTaskWorker(first_group_owner_id, has_retry); + workers.push_back(first_submitted); + workers.push_back(second_submitted); + workers.push_back(third_submitted); + workers.push_back(fourth_submitted); + workers.push_back(fifth_submitted); + workers.push_back(sixth_submitted); + + std::vector, bool>> expected; + expected.push_back(std::make_pair(fourth_submitted, should_retry)); + expected.push_back(std::make_pair(third_submitted, should_retry)); + expected.push_back(std::make_pair(sixth_submitted, should_retry)); + expected.push_back(std::make_pair(fifth_submitted, should_retry)); + expected.push_back(std::make_pair(second_submitted, should_not_retry)); + expected.push_back(std::make_pair(first_submitted, should_not_retry)); + + for (const auto &entry : expected) { + auto worker_to_kill_and_should_retry = + worker_killing_policy_.SelectWorkerToKill(workers, MemorySnapshot()); + auto worker_to_kill = worker_to_kill_and_should_retry.first; + bool retry = worker_to_kill_and_should_retry.second; + ASSERT_EQ(worker_to_kill->WorkerId(), entry.first->WorkerId()); + ASSERT_EQ(retry, entry.second); + workers.erase(std::remove(workers.begin(), workers.end(), worker_to_kill), + workers.end()); + } } +// TEST_F(WorkerKillingGroupByOwnerTest, +// TestTwoGroupsSkipGroupWithOneTaskLeft) { +// auto owner_id = TaskID::ForDriverTask(job_id_); + +// std::vector> workers; +// auto first_submitted = WorkerKillingGroupByOwnerTest::CreateActorCreationWorker(owner_id, no_retry); +// auto second_submitted = WorkerKillingGroupByOwnerTest::CreateTaskWorker(owner_id, no_retry); +// workers.push_back(first_submitted); +// workers.push_back(second_submitted); + +// std::vector, bool>> expected; +// expected.push_back(std::make_pair(second_submitted, should_not_retry)); + +// auto worker_to_kill_and_should_retry = worker_killing_policy_.SelectWorkerToKill(workers, MemorySnapshot()); + +// auto worker_to_kill = worker_to_kill_and_should_retry.first; +// bool retry = worker_to_kill_and_should_retry.second; +// ASSERT_EQ(worker_to_kill->WorkerId(), second_submitted->WorkerId()); +// ASSERT_EQ(retry, should_not_retry); +// } + +// TEST_F(WorkerKillingGroupByOwnerTest, +// TestGroupOrderedByLifoRetriable) { +// std::vector> workers; + +// JobID job_id = JobID::FromInt(75); + +// auto first_submitted = WorkerKillingGroupByOwnerTest::CreateActorCreationWorker(TaskID::FromRandom(job_id), has_retry); +// auto second_submitted = +// WorkerKillingGroupByOwnerTest::CreateTaskWorker(TaskID::FromRandom(job_id), has_retry); +// auto third_submitted = WorkerKillingGroupByOwnerTest::CreateActorCreationWorker(TaskID::FromRandom(job_id), no_retry); +// auto fourth_submitted = +// WorkerKillingGroupByOwnerTest::CreateTaskWorker(TaskID::FromRandom(job_id), no_retry); + +// workers.push_back(first_submitted); +// workers.push_back(second_submitted); +// workers.push_back(third_submitted); +// workers.push_back(fourth_submitted); + +// std::vector, bool>> expected; +// expected.push_back(std::make_pair(second_submitted, should_retry)); +// expected.push_back(std::make_pair(first_submitted, should_not_retry)); + +// for (const auto &entry : expected) { +// auto worker_to_kill_and_should_retry = +// worker_killing_policy_.SelectWorkerToKill(workers, MemorySnapshot()); +// auto worker_to_kill = worker_to_kill_and_should_retry.first; +// bool retry = worker_to_kill_and_should_retry.second; +// ASSERT_EQ(worker_to_kill->WorkerId(), entry.first->WorkerId()); +// ASSERT_EQ(retry, entry.second); +// workers.erase(std::remove(workers.begin(), workers.end(), worker_to_kill), +// workers.end()); +// } +// } + } // namespace raylet } // namespace ray From 6cf64f3fc2d6f279af3ee32bba4003c228153a8f Mon Sep 17 00:00:00 2001 From: Clarence Ng Date: Wed, 11 Jan 2023 08:36:55 -0800 Subject: [PATCH 08/16] [core] oom killer policy: group by owner id Signed-off-by: Clarence Ng --- src/ray/common/task/task_spec.cc | 3 +- src/ray/raylet/node_manager.h | 1 + src/ray/raylet/test/util.h | 4 +- .../worker_killing_policy_group_by_owner.cc | 13 ++- ...rker_killing_policy_group_by_owner_test.cc | 91 +++++++++++-------- 5 files changed, 62 insertions(+), 50 deletions(-) diff --git a/src/ray/common/task/task_spec.cc b/src/ray/common/task/task_spec.cc index 363b66ca7df1..44c7213bd7fe 100644 --- a/src/ray/common/task/task_spec.cc +++ b/src/ray/common/task/task_spec.cc @@ -361,7 +361,7 @@ ActorID TaskSpecification::ActorCreationId() const { } int64_t TaskSpecification::MaxActorRestarts() const { - // RAY_CHECK(IsActorCreationTask()); + RAY_CHECK(IsActorCreationTask()); return message_->actor_creation_task_spec().max_actor_restarts(); } @@ -521,7 +521,6 @@ std::string TaskSpecification::RuntimeEnvDebugString() const { } bool TaskSpecification::IsRetriable() const { - RAY_LOG(ERROR) << IsActorTask() << IsActorCreationTask() << IsNormalTask() << MaxActorRestarts() << MaxRetries(); if (IsActorTask()) { return false; } diff --git a/src/ray/raylet/node_manager.h b/src/ray/raylet/node_manager.h index fde8aa7045c3..f5eb93c3368f 100644 --- a/src/ray/raylet/node_manager.h +++ b/src/ray/raylet/node_manager.h @@ -834,6 +834,7 @@ class NodeManager : public rpc::NodeManagerServiceHandler, /// RaySyncerService for gRPC syncer::RaySyncerService ray_syncer_service_; + /// The Policy for selecting the worker to kill when the nodes runs out of memory. std::shared_ptr worker_killing_policy_; /// Monitors and reports node memory usage and whether it is above threshold. diff --git a/src/ray/raylet/test/util.h b/src/ray/raylet/test/util.h index cc685c601098..7d0a220f00c2 100644 --- a/src/ray/raylet/test/util.h +++ b/src/ray/raylet/test/util.h @@ -95,9 +95,7 @@ class MockWorker : public WorkerInterface { return -1; } void SetAssignedPort(int port) override { RAY_CHECK(false) << "Method unused"; } - const TaskID &GetAssignedTaskId() const override { - return task_id_; - } + const TaskID &GetAssignedTaskId() const override { return task_id_; } bool AddBlockedTaskId(const TaskID &task_id) override { RAY_CHECK(false) << "Method unused"; return false; diff --git a/src/ray/raylet/worker_killing_policy_group_by_owner.cc b/src/ray/raylet/worker_killing_policy_group_by_owner.cc index 1e3de3924a15..fddaa3d577b6 100644 --- a/src/ray/raylet/worker_killing_policy_group_by_owner.cc +++ b/src/ray/raylet/worker_killing_policy_group_by_owner.cc @@ -46,7 +46,8 @@ GroupByOwnerIdWorkerKillingPolicy::SelectWorkerToKill( for (auto worker : workers) { TaskID owner_id = worker->GetAssignedTask().GetTaskSpecification().ParentTaskId(); bool retriable = worker->GetAssignedTask().GetTaskSpecification().IsRetriable(); - TaskID non_retriable_task_id = retriable ? TaskID::FromHex("Retriable") : worker->GetAssignedTaskId(); + TaskID non_retriable_task_id = + retriable ? TaskID::FromHex("Retriable") : worker->GetAssignedTaskId(); GroupKey group_key(owner_id, non_retriable_task_id); auto it = group_map.find(group_key); @@ -55,11 +56,9 @@ GroupByOwnerIdWorkerKillingPolicy::SelectWorkerToKill( Group group(owner_id, retriable); group.AddToGroup(worker); group_map.emplace(group_key, std::move(group)); - RAY_LOG(ERROR) << group.OwnerId() << "ne group size " << group.GetAllWorkers().size() << " " << retriable << " " << group_key.non_retriable_task_id; } else { auto &group = it->second; group.AddToGroup(worker); - RAY_LOG(ERROR) << group.OwnerId() << "update group size " << group.GetAllWorkers().size() << " " << retriable << " " << group_key.non_retriable_task_id; } } @@ -78,9 +77,6 @@ GroupByOwnerIdWorkerKillingPolicy::SelectWorkerToKill( return left_retriable < right_retriable; }); - RAY_LOG(ERROR) << "groups sorted:\n" - << PolicyDebugString(sorted, system_memory); - Group selected_group = sorted.front(); for (Group group : sorted) { if (group.GetAllWorkers().size() > 1) { @@ -91,6 +87,8 @@ GroupByOwnerIdWorkerKillingPolicy::SelectWorkerToKill( auto worker_to_kill = selected_group.SelectWorkerToKill(); bool should_retry = selected_group.GetAllWorkers().size() > 1; + RAY_LOG(INFO) << "Groups sorted based on the policy:\n" << PolicyDebugString(sorted, system_memory); + return std::make_pair(worker_to_kill, should_retry); } @@ -189,7 +187,8 @@ unsigned long GroupByOwnerIdWorkerKillingPolicy::GroupKeyHash(const GroupKey &ke bool GroupByOwnerIdWorkerKillingPolicy::GroupKeyEquals(const GroupKey &left, const GroupKey &right) { - return left.owner_id == right.owner_id && left.non_retriable_task_id == right.non_retriable_task_id; + return left.owner_id == right.owner_id && + left.non_retriable_task_id == right.non_retriable_task_id; } } // namespace raylet diff --git a/src/ray/raylet/worker_killing_policy_group_by_owner_test.cc b/src/ray/raylet/worker_killing_policy_group_by_owner_test.cc index 833e60efa5e7..6e599cdccafd 100644 --- a/src/ray/raylet/worker_killing_policy_group_by_owner_test.cc +++ b/src/ray/raylet/worker_killing_policy_group_by_owner_test.cc @@ -12,12 +12,12 @@ // See the License for the specific language governing permissions and // limitations under the License. -#include "ray/raylet/worker_killing_policy.h" #include "ray/raylet/worker_killing_policy_group_by_owner.h" #include "gtest/gtest.h" #include "ray/common/task/task_spec.h" #include "ray/raylet/test/util.h" +#include "ray/raylet/worker_killing_policy.h" namespace ray { @@ -34,7 +34,8 @@ class WorkerKillingGroupByOwnerTest : public ::testing::Test { int32_t has_retry = 1; GroupByOwnerIdWorkerKillingPolicy worker_killing_policy_; - std::shared_ptr CreateActorCreationWorker(TaskID owner_id, int32_t max_restarts) { + std::shared_ptr CreateActorCreationWorker(TaskID owner_id, + int32_t max_restarts) { rpc::TaskSpec message; message.set_task_id(TaskID::FromRandom(job_id_).Binary()); message.set_parent_task_id(owner_id.Binary()); @@ -48,7 +49,8 @@ class WorkerKillingGroupByOwnerTest : public ::testing::Test { return worker; } - std::shared_ptr CreateTaskWorker(TaskID owner_id, int32_t max_retries) { + std::shared_ptr CreateTaskWorker(TaskID owner_id, + int32_t max_retries) { rpc::TaskSpec message; message.set_task_id(TaskID::FromRandom(job_id_).Binary()); message.set_parent_task_id(owner_id.Binary()); @@ -71,15 +73,15 @@ TEST_F(WorkerKillingGroupByOwnerTest, TestEmptyWorkerPoolSelectsNullWorker) { ASSERT_TRUE(worker_to_kill == nullptr); } -TEST_F(WorkerKillingGroupByOwnerTest, - TestLastWorkerInGroupShouldNotRetry) { +TEST_F(WorkerKillingGroupByOwnerTest, TestLastWorkerInGroupShouldNotRetry) { std::vector> workers; auto owner_id = TaskID::ForDriverTask(job_id_); - auto first_submitted = WorkerKillingGroupByOwnerTest::CreateActorCreationWorker(owner_id, has_retry); + auto first_submitted = + WorkerKillingGroupByOwnerTest::CreateActorCreationWorker(owner_id, has_retry); auto second_submitted = WorkerKillingGroupByOwnerTest::CreateTaskWorker(owner_id, has_retry); - + workers.push_back(first_submitted); workers.push_back(second_submitted); @@ -99,21 +101,23 @@ TEST_F(WorkerKillingGroupByOwnerTest, } } -TEST_F(WorkerKillingGroupByOwnerTest, - TestNonRetriableBelongsToItsOwnGroup) { +TEST_F(WorkerKillingGroupByOwnerTest, TestNonRetriableBelongsToItsOwnGroup) { auto owner_id = TaskID::ForDriverTask(job_id_); std::vector> workers; - auto first_submitted = WorkerKillingGroupByOwnerTest::CreateActorCreationWorker(owner_id, no_retry); - auto second_submitted = WorkerKillingGroupByOwnerTest::CreateTaskWorker(owner_id, no_retry); + auto first_submitted = + WorkerKillingGroupByOwnerTest::CreateActorCreationWorker(owner_id, no_retry); + auto second_submitted = + WorkerKillingGroupByOwnerTest::CreateTaskWorker(owner_id, no_retry); workers.push_back(first_submitted); workers.push_back(second_submitted); - + std::vector, bool>> expected; expected.push_back(std::make_pair(second_submitted, should_not_retry)); - - auto worker_to_kill_and_should_retry = worker_killing_policy_.SelectWorkerToKill(workers, MemorySnapshot()); - + + auto worker_to_kill_and_should_retry = + worker_killing_policy_.SelectWorkerToKill(workers, MemorySnapshot()); + auto worker_to_kill = worker_to_kill_and_should_retry.first; bool retry = worker_to_kill_and_should_retry.second; ASSERT_EQ(worker_to_kill->WorkerId(), second_submitted->WorkerId()); @@ -126,19 +130,25 @@ TEST_F(WorkerKillingGroupByOwnerTest, auto second_group_owner_id = TaskID::FromRandom(job_id_); std::vector> workers; - auto first_submitted = WorkerKillingGroupByOwnerTest::CreateActorCreationWorker(first_group_owner_id, has_retry); - auto second_submitted = WorkerKillingGroupByOwnerTest::CreateTaskWorker(second_group_owner_id, has_retry); - auto third_submitted = WorkerKillingGroupByOwnerTest::CreateActorCreationWorker(second_group_owner_id, has_retry); - auto fourth_submitted = WorkerKillingGroupByOwnerTest::CreateActorCreationWorker(second_group_owner_id, has_retry); - auto fifth_submitted = WorkerKillingGroupByOwnerTest::CreateTaskWorker(first_group_owner_id, has_retry); - auto sixth_submitted = WorkerKillingGroupByOwnerTest::CreateTaskWorker(first_group_owner_id, has_retry); + auto first_submitted = WorkerKillingGroupByOwnerTest::CreateActorCreationWorker( + first_group_owner_id, has_retry); + auto second_submitted = + WorkerKillingGroupByOwnerTest::CreateTaskWorker(second_group_owner_id, has_retry); + auto third_submitted = WorkerKillingGroupByOwnerTest::CreateActorCreationWorker( + second_group_owner_id, has_retry); + auto fourth_submitted = WorkerKillingGroupByOwnerTest::CreateActorCreationWorker( + second_group_owner_id, has_retry); + auto fifth_submitted = + WorkerKillingGroupByOwnerTest::CreateTaskWorker(first_group_owner_id, has_retry); + auto sixth_submitted = + WorkerKillingGroupByOwnerTest::CreateTaskWorker(first_group_owner_id, has_retry); workers.push_back(first_submitted); workers.push_back(second_submitted); workers.push_back(third_submitted); workers.push_back(fourth_submitted); workers.push_back(fifth_submitted); workers.push_back(sixth_submitted); - + std::vector, bool>> expected; expected.push_back(std::make_pair(fourth_submitted, should_retry)); expected.push_back(std::make_pair(third_submitted, should_retry)); @@ -146,7 +156,7 @@ TEST_F(WorkerKillingGroupByOwnerTest, expected.push_back(std::make_pair(fifth_submitted, should_retry)); expected.push_back(std::make_pair(second_submitted, should_not_retry)); expected.push_back(std::make_pair(first_submitted, should_not_retry)); - + for (const auto &entry : expected) { auto worker_to_kill_and_should_retry = worker_killing_policy_.SelectWorkerToKill(workers, MemorySnapshot()); @@ -164,16 +174,17 @@ TEST_F(WorkerKillingGroupByOwnerTest, // auto owner_id = TaskID::ForDriverTask(job_id_); // std::vector> workers; -// auto first_submitted = WorkerKillingGroupByOwnerTest::CreateActorCreationWorker(owner_id, no_retry); -// auto second_submitted = WorkerKillingGroupByOwnerTest::CreateTaskWorker(owner_id, no_retry); -// workers.push_back(first_submitted); -// workers.push_back(second_submitted); - +// auto first_submitted = +// WorkerKillingGroupByOwnerTest::CreateActorCreationWorker(owner_id, no_retry); auto +// second_submitted = WorkerKillingGroupByOwnerTest::CreateTaskWorker(owner_id, +// no_retry); workers.push_back(first_submitted); workers.push_back(second_submitted); + // std::vector, bool>> expected; // expected.push_back(std::make_pair(second_submitted, should_not_retry)); - -// auto worker_to_kill_and_should_retry = worker_killing_policy_.SelectWorkerToKill(workers, MemorySnapshot()); - + +// auto worker_to_kill_and_should_retry = +// worker_killing_policy_.SelectWorkerToKill(workers, MemorySnapshot()); + // auto worker_to_kill = worker_to_kill_and_should_retry.first; // bool retry = worker_to_kill_and_should_retry.second; // ASSERT_EQ(worker_to_kill->WorkerId(), second_submitted->WorkerId()); @@ -186,13 +197,17 @@ TEST_F(WorkerKillingGroupByOwnerTest, // JobID job_id = JobID::FromInt(75); -// auto first_submitted = WorkerKillingGroupByOwnerTest::CreateActorCreationWorker(TaskID::FromRandom(job_id), has_retry); -// auto second_submitted = -// WorkerKillingGroupByOwnerTest::CreateTaskWorker(TaskID::FromRandom(job_id), has_retry); -// auto third_submitted = WorkerKillingGroupByOwnerTest::CreateActorCreationWorker(TaskID::FromRandom(job_id), no_retry); -// auto fourth_submitted = -// WorkerKillingGroupByOwnerTest::CreateTaskWorker(TaskID::FromRandom(job_id), no_retry); - +// auto first_submitted = +// WorkerKillingGroupByOwnerTest::CreateActorCreationWorker(TaskID::FromRandom(job_id), +// has_retry); auto second_submitted = +// WorkerKillingGroupByOwnerTest::CreateTaskWorker(TaskID::FromRandom(job_id), +// has_retry); +// auto third_submitted = +// WorkerKillingGroupByOwnerTest::CreateActorCreationWorker(TaskID::FromRandom(job_id), +// no_retry); auto fourth_submitted = +// WorkerKillingGroupByOwnerTest::CreateTaskWorker(TaskID::FromRandom(job_id), +// no_retry); + // workers.push_back(first_submitted); // workers.push_back(second_submitted); // workers.push_back(third_submitted); From de6b9d49e890fcc8a6aaf096261f914e90175cb7 Mon Sep 17 00:00:00 2001 From: Clarence Ng Date: Wed, 11 Jan 2023 10:45:34 -0800 Subject: [PATCH 09/16] [core] oom killer policy: group by owner id Signed-off-by: Clarence Ng --- .../worker_killing_policy_group_by_owner.cc | 30 ++---- .../worker_killing_policy_group_by_owner.h | 40 ++++---- ...rker_killing_policy_group_by_owner_test.cc | 93 +++++++------------ 3 files changed, 64 insertions(+), 99 deletions(-) diff --git a/src/ray/raylet/worker_killing_policy_group_by_owner.cc b/src/ray/raylet/worker_killing_policy_group_by_owner.cc index fddaa3d577b6..8ad0c908005b 100644 --- a/src/ray/raylet/worker_killing_policy_group_by_owner.cc +++ b/src/ray/raylet/worker_killing_policy_group_by_owner.cc @@ -41,21 +41,19 @@ GroupByOwnerIdWorkerKillingPolicy::SelectWorkerToKill( return std::make_pair(nullptr, /*should retry*/ false); } - GroupMap group_map(10, GroupKeyHash, GroupKeyEquals); - + std::unordered_map group_map; for (auto worker : workers) { - TaskID owner_id = worker->GetAssignedTask().GetTaskSpecification().ParentTaskId(); bool retriable = worker->GetAssignedTask().GetTaskSpecification().IsRetriable(); - TaskID non_retriable_task_id = - retriable ? TaskID::FromHex("Retriable") : worker->GetAssignedTaskId(); + TaskID owner_id = + retriable ? worker->GetAssignedTask().GetTaskSpecification().ParentTaskId() + : worker->GetAssignedTaskId(); - GroupKey group_key(owner_id, non_retriable_task_id); - auto it = group_map.find(group_key); + auto it = group_map.find(owner_id.Binary()); if (it == group_map.end()) { Group group(owner_id, retriable); group.AddToGroup(worker); - group_map.emplace(group_key, std::move(group)); + group_map.emplace(owner_id.Binary(), std::move(group)); } else { auto &group = it->second; group.AddToGroup(worker); @@ -87,7 +85,8 @@ GroupByOwnerIdWorkerKillingPolicy::SelectWorkerToKill( auto worker_to_kill = selected_group.SelectWorkerToKill(); bool should_retry = selected_group.GetAllWorkers().size() > 1; - RAY_LOG(INFO) << "Groups sorted based on the policy:\n" << PolicyDebugString(sorted, system_memory); + RAY_LOG(INFO) << "Groups sorted based on the policy:\n" + << PolicyDebugString(sorted, system_memory); return std::make_pair(worker_to_kill, should_retry); } @@ -178,19 +177,6 @@ const std::vector> Group::GetAllWorkers() const return workers_; } -unsigned long GroupByOwnerIdWorkerKillingPolicy::GroupKeyHash(const GroupKey &key) { - unsigned long hash = 0; - boost::hash_combine(hash, key.owner_id.Hex()); - boost::hash_combine(hash, key.non_retriable_task_id.Hex()); - return hash; -} - -bool GroupByOwnerIdWorkerKillingPolicy::GroupKeyEquals(const GroupKey &left, - const GroupKey &right) { - return left.owner_id == right.owner_id && - left.non_retriable_task_id == right.non_retriable_task_id; -} - } // namespace raylet } // namespace ray diff --git a/src/ray/raylet/worker_killing_policy_group_by_owner.h b/src/ray/raylet/worker_killing_policy_group_by_owner.h index 9d9372b1d4e8..b958166c0db4 100644 --- a/src/ray/raylet/worker_killing_policy_group_by_owner.h +++ b/src/ray/raylet/worker_killing_policy_group_by_owner.h @@ -25,22 +25,35 @@ namespace ray { namespace raylet { +/// Key groups on its owner id. For non-retriable task the owner id is itself, +/// Since non-retriable task forms its own group. struct GroupKey { - GroupKey(const TaskID &owner_id, const TaskID &non_retriable_task_id) - : owner_id(owner_id), non_retriable_task_id(non_retriable_task_id) {} + GroupKey(const TaskID &owner_id) : owner_id(owner_id) {} const TaskID &owner_id; - const TaskID &non_retriable_task_id; }; class Group { public: Group(const TaskID &owner_id, bool retriable) : owner_id_(owner_id), retriable_(retriable) {} + + /// The parent task id of the tasks belonging to this group TaskID OwnerId() const; + + /// Whether tasks in this group are retriable. bool IsRetriable() const; + + /// Gets the task time of the earliest task of this group, to be + /// used for group priority. const std::chrono::steady_clock::time_point GetAssignedTaskTime() const; + + /// Returns the worker to be killed in this group, in LIFO order. const std::shared_ptr SelectWorkerToKill() const; + + /// Tasks belonging to this group. const std::vector> GetAllWorkers() const; + + /// Adds worker that the task belongs to to the group. void AddToGroup(std::shared_ptr worker); private: @@ -57,19 +70,13 @@ class Group { bool retriable_; }; -typedef std::unordered_map, - std::function> - GroupMap; - -/// Groups worker by its owner id if it is a task. Each actor belongs to its own group. -/// The inter-group policy prioritizes killing groups that are retriable first, then in -/// LIFO order, where each group's priority is based on the time of its earliest submitted -/// member. The intra-group policy prioritizes killing in LIFO order. +/// Groups task by its owner id. Non-retriable task (whether it be task or actor) forms +/// its own group. Prioritizes killing groups that are retriable first, then in LIFO +/// order, where each group's order is based on the time of its earliest submitted member. +/// Within the group it prioritizes killing task in LIFO order of the submission time. /// -/// It will set the task to-be-killed to be non-retriable if it is the last member of the -/// group. Otherwise the task is set to be retriable. +/// When selecting a worker / task to be killed, it will set the task to-be-killed to be +/// non-retriable if it is the last member of the group, and retriable otherwise. class GroupByOwnerIdWorkerKillingPolicy : public WorkerKillingPolicy { public: GroupByOwnerIdWorkerKillingPolicy(); @@ -78,10 +85,9 @@ class GroupByOwnerIdWorkerKillingPolicy : public WorkerKillingPolicy { const MemorySnapshot &system_memory) const; private: + /// Creates the debug string of the groups created by the policy. static std::string PolicyDebugString(const std::vector &groups, const MemorySnapshot &system_memory); - static unsigned long GroupKeyHash(const GroupKey &key); - static bool GroupKeyEquals(const GroupKey &left, const GroupKey &right); }; } // namespace raylet diff --git a/src/ray/raylet/worker_killing_policy_group_by_owner_test.cc b/src/ray/raylet/worker_killing_policy_group_by_owner_test.cc index 6e599cdccafd..dae5fa8f080c 100644 --- a/src/ray/raylet/worker_killing_policy_group_by_owner_test.cc +++ b/src/ray/raylet/worker_killing_policy_group_by_owner_test.cc @@ -125,7 +125,7 @@ TEST_F(WorkerKillingGroupByOwnerTest, TestNonRetriableBelongsToItsOwnGroup) { } TEST_F(WorkerKillingGroupByOwnerTest, - TestGroupSortedByFirstSubmittedTaskAndSkipKillGroupWithOneTaskLeft) { + TestGroupSortedByFirstSubmittedTaskAndSkipGroupWithOneTaskLeft) { auto first_group_owner_id = TaskID::FromRandom(job_id_); auto second_group_owner_id = TaskID::FromRandom(job_id_); @@ -169,65 +169,38 @@ TEST_F(WorkerKillingGroupByOwnerTest, } } -// TEST_F(WorkerKillingGroupByOwnerTest, -// TestTwoGroupsSkipGroupWithOneTaskLeft) { -// auto owner_id = TaskID::ForDriverTask(job_id_); - -// std::vector> workers; -// auto first_submitted = -// WorkerKillingGroupByOwnerTest::CreateActorCreationWorker(owner_id, no_retry); auto -// second_submitted = WorkerKillingGroupByOwnerTest::CreateTaskWorker(owner_id, -// no_retry); workers.push_back(first_submitted); workers.push_back(second_submitted); - -// std::vector, bool>> expected; -// expected.push_back(std::make_pair(second_submitted, should_not_retry)); - -// auto worker_to_kill_and_should_retry = -// worker_killing_policy_.SelectWorkerToKill(workers, MemorySnapshot()); - -// auto worker_to_kill = worker_to_kill_and_should_retry.first; -// bool retry = worker_to_kill_and_should_retry.second; -// ASSERT_EQ(worker_to_kill->WorkerId(), second_submitted->WorkerId()); -// ASSERT_EQ(retry, should_not_retry); -// } - -// TEST_F(WorkerKillingGroupByOwnerTest, -// TestGroupOrderedByLifoRetriable) { -// std::vector> workers; - -// JobID job_id = JobID::FromInt(75); - -// auto first_submitted = -// WorkerKillingGroupByOwnerTest::CreateActorCreationWorker(TaskID::FromRandom(job_id), -// has_retry); auto second_submitted = -// WorkerKillingGroupByOwnerTest::CreateTaskWorker(TaskID::FromRandom(job_id), -// has_retry); -// auto third_submitted = -// WorkerKillingGroupByOwnerTest::CreateActorCreationWorker(TaskID::FromRandom(job_id), -// no_retry); auto fourth_submitted = -// WorkerKillingGroupByOwnerTest::CreateTaskWorker(TaskID::FromRandom(job_id), -// no_retry); - -// workers.push_back(first_submitted); -// workers.push_back(second_submitted); -// workers.push_back(third_submitted); -// workers.push_back(fourth_submitted); - -// std::vector, bool>> expected; -// expected.push_back(std::make_pair(second_submitted, should_retry)); -// expected.push_back(std::make_pair(first_submitted, should_not_retry)); - -// for (const auto &entry : expected) { -// auto worker_to_kill_and_should_retry = -// worker_killing_policy_.SelectWorkerToKill(workers, MemorySnapshot()); -// auto worker_to_kill = worker_to_kill_and_should_retry.first; -// bool retry = worker_to_kill_and_should_retry.second; -// ASSERT_EQ(worker_to_kill->WorkerId(), entry.first->WorkerId()); -// ASSERT_EQ(retry, entry.second); -// workers.erase(std::remove(workers.begin(), workers.end(), worker_to_kill), -// workers.end()); -// } -// } +TEST_F(WorkerKillingGroupByOwnerTest, TestGroupSortedByRetriableLifo) { + std::vector> workers; + auto first_submitted = WorkerKillingGroupByOwnerTest::CreateActorCreationWorker( + TaskID::FromRandom(job_id_), has_retry); + auto second_submitted = WorkerKillingGroupByOwnerTest::CreateActorCreationWorker( + TaskID::FromRandom(job_id_), has_retry); + auto third_submitted = WorkerKillingGroupByOwnerTest::CreateActorCreationWorker( + TaskID::FromRandom(job_id_), no_retry); + auto fourth_submitted = WorkerKillingGroupByOwnerTest::CreateActorCreationWorker( + TaskID::FromRandom(job_id_), no_retry); + workers.push_back(first_submitted); + workers.push_back(second_submitted); + workers.push_back(third_submitted); + workers.push_back(fourth_submitted); + + std::vector, bool>> expected; + expected.push_back(std::make_pair(second_submitted, should_not_retry)); + expected.push_back(std::make_pair(first_submitted, should_not_retry)); + expected.push_back(std::make_pair(fourth_submitted, should_not_retry)); + expected.push_back(std::make_pair(third_submitted, should_not_retry)); + + for (const auto &entry : expected) { + auto worker_to_kill_and_should_retry = + worker_killing_policy_.SelectWorkerToKill(workers, MemorySnapshot()); + auto worker_to_kill = worker_to_kill_and_should_retry.first; + bool retry = worker_to_kill_and_should_retry.second; + ASSERT_EQ(worker_to_kill->WorkerId(), entry.first->WorkerId()); + ASSERT_EQ(retry, entry.second); + workers.erase(std::remove(workers.begin(), workers.end(), worker_to_kill), + workers.end()); + } +} } // namespace raylet From a4442a5921089f32faa3b8980c1ca0242d313a1e Mon Sep 17 00:00:00 2001 From: Clarence Ng Date: Wed, 11 Jan 2023 10:52:09 -0800 Subject: [PATCH 10/16] [core] oom killer policy: group by owner id Signed-off-by: Clarence Ng --- src/ray/raylet/node_manager.h | 2 +- src/ray/raylet/worker_killing_policy_group_by_owner.cc | 3 ++- src/ray/raylet/worker_killing_policy_group_by_owner_test.cc | 2 +- 3 files changed, 4 insertions(+), 3 deletions(-) diff --git a/src/ray/raylet/node_manager.h b/src/ray/raylet/node_manager.h index f5eb93c3368f..e470ac218e6d 100644 --- a/src/ray/raylet/node_manager.h +++ b/src/ray/raylet/node_manager.h @@ -834,7 +834,7 @@ class NodeManager : public rpc::NodeManagerServiceHandler, /// RaySyncerService for gRPC syncer::RaySyncerService ray_syncer_service_; - /// The Policy for selecting the worker to kill when the nodes runs out of memory. + /// The Policy for selecting the worker to kill when the node runs out of memory. std::shared_ptr worker_killing_policy_; /// Monitors and reports node memory usage and whether it is above threshold. diff --git a/src/ray/raylet/worker_killing_policy_group_by_owner.cc b/src/ray/raylet/worker_killing_policy_group_by_owner.cc index 8ad0c908005b..08cc02614480 100644 --- a/src/ray/raylet/worker_killing_policy_group_by_owner.cc +++ b/src/ray/raylet/worker_killing_policy_group_by_owner.cc @@ -75,15 +75,16 @@ GroupByOwnerIdWorkerKillingPolicy::SelectWorkerToKill( return left_retriable < right_retriable; }); + bool should_retry = false; Group selected_group = sorted.front(); for (Group group : sorted) { if (group.GetAllWorkers().size() > 1) { selected_group = group; + should_retry = true; break; } } auto worker_to_kill = selected_group.SelectWorkerToKill(); - bool should_retry = selected_group.GetAllWorkers().size() > 1; RAY_LOG(INFO) << "Groups sorted based on the policy:\n" << PolicyDebugString(sorted, system_memory); diff --git a/src/ray/raylet/worker_killing_policy_group_by_owner_test.cc b/src/ray/raylet/worker_killing_policy_group_by_owner_test.cc index dae5fa8f080c..9d129c314e94 100644 --- a/src/ray/raylet/worker_killing_policy_group_by_owner_test.cc +++ b/src/ray/raylet/worker_killing_policy_group_by_owner_test.cc @@ -101,7 +101,7 @@ TEST_F(WorkerKillingGroupByOwnerTest, TestLastWorkerInGroupShouldNotRetry) { } } -TEST_F(WorkerKillingGroupByOwnerTest, TestNonRetriableBelongsToItsOwnGroup) { +TEST_F(WorkerKillingGroupByOwnerTest, TestNonRetriableBelongsToItsOwnGroupAndLIFOKill) { auto owner_id = TaskID::ForDriverTask(job_id_); std::vector> workers; From 36500506603da51db629bf783ce2f27a80f4ed8c Mon Sep 17 00:00:00 2001 From: Clarence Ng Date: Wed, 11 Jan 2023 10:55:13 -0800 Subject: [PATCH 11/16] [core] oom killer policy: group by owner id Signed-off-by: Clarence Ng --- ...rker_killing_policy_group_by_owner_test.cc | 92 +++++++++---------- 1 file changed, 46 insertions(+), 46 deletions(-) diff --git a/src/ray/raylet/worker_killing_policy_group_by_owner_test.cc b/src/ray/raylet/worker_killing_policy_group_by_owner_test.cc index 9d129c314e94..a8a51f95bf73 100644 --- a/src/ray/raylet/worker_killing_policy_group_by_owner_test.cc +++ b/src/ray/raylet/worker_killing_policy_group_by_owner_test.cc @@ -28,10 +28,10 @@ class WorkerKillingGroupByOwnerTest : public ::testing::Test { instrumented_io_context io_context_; int32_t port_ = 2389; JobID job_id_ = JobID::FromInt(75); - bool should_retry = true; - bool should_not_retry = false; - int32_t no_retry = 0; - int32_t has_retry = 1; + bool should_retry_ = true; + bool should_not_retry_ = false; + int32_t no_retry_ = 0; + int32_t has_retry_ = 1; GroupByOwnerIdWorkerKillingPolicy worker_killing_policy_; std::shared_ptr CreateActorCreationWorker(TaskID owner_id, @@ -67,9 +67,9 @@ class WorkerKillingGroupByOwnerTest : public ::testing::Test { TEST_F(WorkerKillingGroupByOwnerTest, TestEmptyWorkerPoolSelectsNullWorker) { std::vector> workers; - auto worker_to_kill_and_should_retry = + auto worker_to_kill_and_should_retry_ = worker_killing_policy_.SelectWorkerToKill(workers, MemorySnapshot()); - auto worker_to_kill = worker_to_kill_and_should_retry.first; + auto worker_to_kill = worker_to_kill_and_should_retry_.first; ASSERT_TRUE(worker_to_kill == nullptr); } @@ -78,22 +78,22 @@ TEST_F(WorkerKillingGroupByOwnerTest, TestLastWorkerInGroupShouldNotRetry) { auto owner_id = TaskID::ForDriverTask(job_id_); auto first_submitted = - WorkerKillingGroupByOwnerTest::CreateActorCreationWorker(owner_id, has_retry); + WorkerKillingGroupByOwnerTest::CreateActorCreationWorker(owner_id, has_retry_); auto second_submitted = - WorkerKillingGroupByOwnerTest::CreateTaskWorker(owner_id, has_retry); + WorkerKillingGroupByOwnerTest::CreateTaskWorker(owner_id, has_retry_); workers.push_back(first_submitted); workers.push_back(second_submitted); std::vector, bool>> expected; - expected.push_back(std::make_pair(second_submitted, should_retry)); - expected.push_back(std::make_pair(first_submitted, should_not_retry)); + expected.push_back(std::make_pair(second_submitted, should_retry_)); + expected.push_back(std::make_pair(first_submitted, should_not_retry_)); for (const auto &entry : expected) { - auto worker_to_kill_and_should_retry = + auto worker_to_kill_and_should_retry_ = worker_killing_policy_.SelectWorkerToKill(workers, MemorySnapshot()); - auto worker_to_kill = worker_to_kill_and_should_retry.first; - bool retry = worker_to_kill_and_should_retry.second; + auto worker_to_kill = worker_to_kill_and_should_retry_.first; + bool retry = worker_to_kill_and_should_retry_.second; ASSERT_EQ(worker_to_kill->WorkerId(), entry.first->WorkerId()); ASSERT_EQ(retry, entry.second); workers.erase(std::remove(workers.begin(), workers.end(), worker_to_kill), @@ -106,22 +106,22 @@ TEST_F(WorkerKillingGroupByOwnerTest, TestNonRetriableBelongsToItsOwnGroupAndLIF std::vector> workers; auto first_submitted = - WorkerKillingGroupByOwnerTest::CreateActorCreationWorker(owner_id, no_retry); + WorkerKillingGroupByOwnerTest::CreateActorCreationWorker(owner_id, no_retry_); auto second_submitted = - WorkerKillingGroupByOwnerTest::CreateTaskWorker(owner_id, no_retry); + WorkerKillingGroupByOwnerTest::CreateTaskWorker(owner_id, no_retry_); workers.push_back(first_submitted); workers.push_back(second_submitted); std::vector, bool>> expected; - expected.push_back(std::make_pair(second_submitted, should_not_retry)); + expected.push_back(std::make_pair(second_submitted, should_not_retry_)); - auto worker_to_kill_and_should_retry = + auto worker_to_kill_and_should_retry_ = worker_killing_policy_.SelectWorkerToKill(workers, MemorySnapshot()); - auto worker_to_kill = worker_to_kill_and_should_retry.first; - bool retry = worker_to_kill_and_should_retry.second; + auto worker_to_kill = worker_to_kill_and_should_retry_.first; + bool retry = worker_to_kill_and_should_retry_.second; ASSERT_EQ(worker_to_kill->WorkerId(), second_submitted->WorkerId()); - ASSERT_EQ(retry, should_not_retry); + ASSERT_EQ(retry, should_not_retry_); } TEST_F(WorkerKillingGroupByOwnerTest, @@ -131,17 +131,17 @@ TEST_F(WorkerKillingGroupByOwnerTest, std::vector> workers; auto first_submitted = WorkerKillingGroupByOwnerTest::CreateActorCreationWorker( - first_group_owner_id, has_retry); + first_group_owner_id, has_retry_); auto second_submitted = - WorkerKillingGroupByOwnerTest::CreateTaskWorker(second_group_owner_id, has_retry); + WorkerKillingGroupByOwnerTest::CreateTaskWorker(second_group_owner_id, has_retry_); auto third_submitted = WorkerKillingGroupByOwnerTest::CreateActorCreationWorker( - second_group_owner_id, has_retry); + second_group_owner_id, has_retry_); auto fourth_submitted = WorkerKillingGroupByOwnerTest::CreateActorCreationWorker( - second_group_owner_id, has_retry); + second_group_owner_id, has_retry_); auto fifth_submitted = - WorkerKillingGroupByOwnerTest::CreateTaskWorker(first_group_owner_id, has_retry); + WorkerKillingGroupByOwnerTest::CreateTaskWorker(first_group_owner_id, has_retry_); auto sixth_submitted = - WorkerKillingGroupByOwnerTest::CreateTaskWorker(first_group_owner_id, has_retry); + WorkerKillingGroupByOwnerTest::CreateTaskWorker(first_group_owner_id, has_retry_); workers.push_back(first_submitted); workers.push_back(second_submitted); workers.push_back(third_submitted); @@ -150,18 +150,18 @@ TEST_F(WorkerKillingGroupByOwnerTest, workers.push_back(sixth_submitted); std::vector, bool>> expected; - expected.push_back(std::make_pair(fourth_submitted, should_retry)); - expected.push_back(std::make_pair(third_submitted, should_retry)); - expected.push_back(std::make_pair(sixth_submitted, should_retry)); - expected.push_back(std::make_pair(fifth_submitted, should_retry)); - expected.push_back(std::make_pair(second_submitted, should_not_retry)); - expected.push_back(std::make_pair(first_submitted, should_not_retry)); + expected.push_back(std::make_pair(fourth_submitted, should_retry_)); + expected.push_back(std::make_pair(third_submitted, should_retry_)); + expected.push_back(std::make_pair(sixth_submitted, should_retry_)); + expected.push_back(std::make_pair(fifth_submitted, should_retry_)); + expected.push_back(std::make_pair(second_submitted, should_not_retry_)); + expected.push_back(std::make_pair(first_submitted, should_not_retry_)); for (const auto &entry : expected) { - auto worker_to_kill_and_should_retry = + auto worker_to_kill_and_should_retry_ = worker_killing_policy_.SelectWorkerToKill(workers, MemorySnapshot()); - auto worker_to_kill = worker_to_kill_and_should_retry.first; - bool retry = worker_to_kill_and_should_retry.second; + auto worker_to_kill = worker_to_kill_and_should_retry_.first; + bool retry = worker_to_kill_and_should_retry_.second; ASSERT_EQ(worker_to_kill->WorkerId(), entry.first->WorkerId()); ASSERT_EQ(retry, entry.second); workers.erase(std::remove(workers.begin(), workers.end(), worker_to_kill), @@ -172,29 +172,29 @@ TEST_F(WorkerKillingGroupByOwnerTest, TEST_F(WorkerKillingGroupByOwnerTest, TestGroupSortedByRetriableLifo) { std::vector> workers; auto first_submitted = WorkerKillingGroupByOwnerTest::CreateActorCreationWorker( - TaskID::FromRandom(job_id_), has_retry); + TaskID::FromRandom(job_id_), has_retry_); auto second_submitted = WorkerKillingGroupByOwnerTest::CreateActorCreationWorker( - TaskID::FromRandom(job_id_), has_retry); + TaskID::FromRandom(job_id_), has_retry_); auto third_submitted = WorkerKillingGroupByOwnerTest::CreateActorCreationWorker( - TaskID::FromRandom(job_id_), no_retry); + TaskID::FromRandom(job_id_), no_retry_); auto fourth_submitted = WorkerKillingGroupByOwnerTest::CreateActorCreationWorker( - TaskID::FromRandom(job_id_), no_retry); + TaskID::FromRandom(job_id_), no_retry_); workers.push_back(first_submitted); workers.push_back(second_submitted); workers.push_back(third_submitted); workers.push_back(fourth_submitted); std::vector, bool>> expected; - expected.push_back(std::make_pair(second_submitted, should_not_retry)); - expected.push_back(std::make_pair(first_submitted, should_not_retry)); - expected.push_back(std::make_pair(fourth_submitted, should_not_retry)); - expected.push_back(std::make_pair(third_submitted, should_not_retry)); + expected.push_back(std::make_pair(second_submitted, should_not_retry_)); + expected.push_back(std::make_pair(first_submitted, should_not_retry_)); + expected.push_back(std::make_pair(fourth_submitted, should_not_retry_)); + expected.push_back(std::make_pair(third_submitted, should_not_retry_)); for (const auto &entry : expected) { - auto worker_to_kill_and_should_retry = + auto worker_to_kill_and_should_retry_ = worker_killing_policy_.SelectWorkerToKill(workers, MemorySnapshot()); - auto worker_to_kill = worker_to_kill_and_should_retry.first; - bool retry = worker_to_kill_and_should_retry.second; + auto worker_to_kill = worker_to_kill_and_should_retry_.first; + bool retry = worker_to_kill_and_should_retry_.second; ASSERT_EQ(worker_to_kill->WorkerId(), entry.first->WorkerId()); ASSERT_EQ(retry, entry.second); workers.erase(std::remove(workers.begin(), workers.end(), worker_to_kill), From 3eded6ec6101a245b2bdac8a0dc18ed251dfe100 Mon Sep 17 00:00:00 2001 From: Clarence Ng Date: Mon, 16 Jan 2023 10:08:11 -0800 Subject: [PATCH 12/16] [core] oom killer policy: group by owner id Signed-off-by: Clarence Ng --- src/ray/raylet/test/util.h | 8 +++----- src/ray/raylet/worker.h | 15 ++++++++------- src/ray/raylet/worker_killing_policy.cc | 6 ++++-- .../worker_killing_policy_group_by_owner.cc | 13 +++++++------ .../raylet/worker_killing_policy_group_by_owner.h | 6 ++++-- 5 files changed, 26 insertions(+), 22 deletions(-) diff --git a/src/ray/raylet/test/util.h b/src/ray/raylet/test/util.h index 7d0a220f00c2..3fe2ddffc41f 100644 --- a/src/ray/raylet/test/util.h +++ b/src/ray/raylet/test/util.h @@ -38,12 +38,10 @@ class MockWorker : public WorkerInterface { void SetAssignedTask(const RayTask &assigned_task) override { task_ = assigned_task; - task_assign_time_ = std::chrono::steady_clock::now(); + task_assign_time_ = absl::Now(); }; - const std::chrono::steady_clock::time_point GetAssignedTaskTime() const override { - return task_assign_time_; - }; + absl::Time GetAssignedTaskTime() const override { return task_assign_time_; }; const std::string IpAddress() const override { return address_.ip_address(); } @@ -186,7 +184,7 @@ class MockWorker : public WorkerInterface { BundleID bundle_id_; bool blocked_ = false; RayTask task_; - std::chrono::steady_clock::time_point task_assign_time_; + absl::Time task_assign_time_; int runtime_env_hash_; TaskID task_id_; }; diff --git a/src/ray/raylet/worker.h b/src/ray/raylet/worker.h index a3654fba8831..8ac630fc3f64 100644 --- a/src/ray/raylet/worker.h +++ b/src/ray/raylet/worker.h @@ -17,6 +17,8 @@ #include #include "absl/memory/memory.h" +#include "absl/time/clock.h" +#include "absl/time/time.h" #include "gtest/gtest_prod.h" #include "ray/common/client_connection.h" #include "ray/common/id.h" @@ -109,7 +111,7 @@ class WorkerInterface { virtual bool IsAvailableForScheduling() const = 0; /// Time when the last task was assigned to this worker. - virtual const std::chrono::steady_clock::time_point GetAssignedTaskTime() const = 0; + virtual absl::Time GetAssignedTaskTime() const = 0; protected: virtual void SetStartupToken(StartupToken startup_token) = 0; @@ -213,12 +215,10 @@ class Worker : public WorkerInterface { void SetAssignedTask(const RayTask &assigned_task) { assigned_task_ = assigned_task; - task_assign_time_ = std::chrono::steady_clock::now(); - }; + task_assign_time_ = absl::Now(); + } - const std::chrono::steady_clock::time_point GetAssignedTaskTime() const { - return task_assign_time_; - }; + absl::Time GetAssignedTaskTime() const { return task_assign_time_; }; bool IsRegistered() { return rpc_client_ != nullptr; } @@ -298,7 +298,8 @@ class Worker : public WorkerInterface { /// RayTask being assigned to this worker. RayTask assigned_task_; /// Time when the last task was assigned to this worker. - std::chrono::steady_clock::time_point task_assign_time_; + // std::chrono::steady_clock::time_point task_assign_time_; + absl::Time task_assign_time_; /// If true, a RPC need to be sent to notify the worker about GCS restarting. bool notify_gcs_restarted_ = false; }; diff --git a/src/ray/raylet/worker_killing_policy.cc b/src/ray/raylet/worker_killing_policy.cc index 8996d2e243aa..671090588292 100644 --- a/src/ray/raylet/worker_killing_policy.cc +++ b/src/ray/raylet/worker_killing_policy.cc @@ -78,9 +78,11 @@ std::string WorkerKillingPolicy::WorkersDebugString( << "Can't find memory usage for PID, reporting zero. PID: " << pid; } result << "Worker " << index << ": task assigned time counter " - << worker->GetAssignedTaskTime().time_since_epoch().count() << " worker id " - << worker->WorkerId() << " memory used " << used_memory << " task spec " + << absl::FormatTime(worker->GetAssignedTaskTime(), absl::UTCTimeZone()) + << " worker id " << worker->WorkerId() << " memory used " << used_memory + << " task spec " << worker->GetAssignedTask().GetTaskSpecification().DebugString() << "\n"; + index += 1; if (index > num_workers) { break; diff --git a/src/ray/raylet/worker_killing_policy_group_by_owner.cc b/src/ray/raylet/worker_killing_policy_group_by_owner.cc index 08cc02614480..66015c953da0 100644 --- a/src/ray/raylet/worker_killing_policy_group_by_owner.cc +++ b/src/ray/raylet/worker_killing_policy_group_by_owner.cc @@ -20,6 +20,7 @@ #include #include "absl/container/flat_hash_map.h" +#include "absl/time/time.h" #include "ray/common/asio/instrumented_io_context.h" #include "ray/common/asio/periodical_runner.h" #include "ray/raylet/worker.h" @@ -99,7 +100,8 @@ std::string GroupByOwnerIdWorkerKillingPolicy::PolicyDebugString( for (auto &group : groups) { result << "Group (retriable: " << group.IsRetriable() << ") (owner id: " << group.OwnerId() << ") (time counter: " - << group.GetAssignedTaskTime().time_since_epoch().count() << "):\n"; + << absl::FormatTime(group.GetAssignedTaskTime(), absl::UTCTimeZone()) + << "):\n"; int64_t worker_index = 0; for (auto &worker : group.GetAllWorkers()) { @@ -113,8 +115,9 @@ std::string GroupByOwnerIdWorkerKillingPolicy::PolicyDebugString( << "Can't find memory usage for PID, reporting zero. PID: " << pid; } result << "Worker time counter " - << worker->GetAssignedTaskTime().time_since_epoch().count() << " worker id " - << worker->WorkerId() << " memory used " << used_memory << " task spec " + << absl::FormatTime(worker->GetAssignedTaskTime(), absl::UTCTimeZone()) + << " worker id " << worker->WorkerId() << " memory used " << used_memory + << " task spec " << worker->GetAssignedTask().GetTaskSpecification().DebugString() << "\n"; worker_index += 1; @@ -136,9 +139,7 @@ TaskID Group::OwnerId() const { return owner_id_; } bool Group::IsRetriable() const { return retriable_; } -const std::chrono::steady_clock::time_point Group::GetAssignedTaskTime() const { - return time_; -} +const absl::Time Group::GetAssignedTaskTime() const { return time_; } void Group::AddToGroup(std::shared_ptr worker) { if (worker->GetAssignedTaskTime() < time_) { diff --git a/src/ray/raylet/worker_killing_policy_group_by_owner.h b/src/ray/raylet/worker_killing_policy_group_by_owner.h index b958166c0db4..d274a1701553 100644 --- a/src/ray/raylet/worker_killing_policy_group_by_owner.h +++ b/src/ray/raylet/worker_killing_policy_group_by_owner.h @@ -17,6 +17,8 @@ #include #include "absl/container/flat_hash_set.h" +#include "absl/time/clock.h" +#include "absl/time/time.h" #include "ray/common/memory_monitor.h" #include "ray/raylet/worker.h" #include "ray/raylet/worker_killing_policy.h" @@ -45,7 +47,7 @@ class Group { /// Gets the task time of the earliest task of this group, to be /// used for group priority. - const std::chrono::steady_clock::time_point GetAssignedTaskTime() const; + const absl::Time GetAssignedTaskTime() const; /// Returns the worker to be killed in this group, in LIFO order. const std::shared_ptr SelectWorkerToKill() const; @@ -61,7 +63,7 @@ class Group { std::vector> workers_; /// The earliest creation time of the tasks. - std::chrono::steady_clock::time_point time_ = std::chrono::steady_clock::now(); + absl::Time time_ = absl::Now(); /// The owner id shared by tasks of this group. TaskID owner_id_; From 2675d6561dd1fbab7fb63d26deee52937e788d3d Mon Sep 17 00:00:00 2001 From: Clarence Ng Date: Mon, 16 Jan 2023 14:57:19 -0800 Subject: [PATCH 13/16] [core] oom killer policy: group by owner id Signed-off-by: Clarence Ng --- src/ray/raylet/test/util.h | 9 ++--- src/ray/raylet/worker_killing_policy.cc | 2 +- .../worker_killing_policy_group_by_owner.cc | 40 ++++++++++--------- .../worker_killing_policy_group_by_owner.h | 13 +++--- ...rker_killing_policy_group_by_owner_test.cc | 35 ++++++++++++---- 5 files changed, 62 insertions(+), 37 deletions(-) diff --git a/src/ray/raylet/test/util.h b/src/ray/raylet/test/util.h index 3fe2ddffc41f..5007a9f491e1 100644 --- a/src/ray/raylet/test/util.h +++ b/src/ray/raylet/test/util.h @@ -24,7 +24,8 @@ class MockWorker : public WorkerInterface { : worker_id_(worker_id), port_(port), is_detached_actor_(false), - runtime_env_hash_(runtime_env_hash) {} + runtime_env_hash_(runtime_env_hash), + job_id_(JobID::FromInt(859)) {} WorkerID WorkerId() const override { return worker_id_; } @@ -107,10 +108,7 @@ class MockWorker : public WorkerInterface { auto *t = new std::unordered_set(); return *t; } - const JobID &GetAssignedJobId() const override { - RAY_CHECK(false) << "Method unused"; - return JobID::Nil(); - } + const JobID &GetAssignedJobId() const override { return job_id_; } int GetRuntimeEnvHash() const override { return runtime_env_hash_; } void AssignActorId(const ActorID &actor_id) override { RAY_CHECK(false) << "Method unused"; @@ -187,6 +185,7 @@ class MockWorker : public WorkerInterface { absl::Time task_assign_time_; int runtime_env_hash_; TaskID task_id_; + JobID job_id_; }; } // namespace raylet diff --git a/src/ray/raylet/worker_killing_policy.cc b/src/ray/raylet/worker_killing_policy.cc index 671090588292..7fe218224a89 100644 --- a/src/ray/raylet/worker_killing_policy.cc +++ b/src/ray/raylet/worker_killing_policy.cc @@ -77,7 +77,7 @@ std::string WorkerKillingPolicy::WorkersDebugString( RAY_LOG_EVERY_MS(INFO, 60000) << "Can't find memory usage for PID, reporting zero. PID: " << pid; } - result << "Worker " << index << ": task assigned time counter " + result << "Worker " << index << ": task assigned time " << absl::FormatTime(worker->GetAssignedTaskTime(), absl::UTCTimeZone()) << " worker id " << worker->WorkerId() << " memory used " << used_memory << " task spec " diff --git a/src/ray/raylet/worker_killing_policy_group_by_owner.cc b/src/ray/raylet/worker_killing_policy_group_by_owner.cc index 66015c953da0..ab4fe8d8d886 100644 --- a/src/ray/raylet/worker_killing_policy_group_by_owner.cc +++ b/src/ray/raylet/worker_killing_policy_group_by_owner.cc @@ -42,19 +42,23 @@ GroupByOwnerIdWorkerKillingPolicy::SelectWorkerToKill( return std::make_pair(nullptr, /*should retry*/ false); } - std::unordered_map group_map; + TaskID non_retriable_owner_id = TaskID::Nil(); + std::unordered_map group_map; for (auto worker : workers) { bool retriable = worker->GetAssignedTask().GetTaskSpecification().IsRetriable(); + if (non_retriable_owner_id == TaskID::Nil()) { + non_retriable_owner_id = TaskID::FromRandom(worker->GetAssignedJobId()); + } TaskID owner_id = retriable ? worker->GetAssignedTask().GetTaskSpecification().ParentTaskId() - : worker->GetAssignedTaskId(); + : non_retriable_owner_id; - auto it = group_map.find(owner_id.Binary()); + auto it = group_map.find(owner_id); if (it == group_map.end()) { Group group(owner_id, retriable); group.AddToGroup(worker); - group_map.emplace(owner_id.Binary(), std::move(group)); + group_map.emplace(owner_id, std::move(group)); } else { auto &group = it->second; group.AddToGroup(worker); @@ -66,28 +70,28 @@ GroupByOwnerIdWorkerKillingPolicy::SelectWorkerToKill( sorted.push_back(it->second); } + /// Prioritizes killing groups that are retriable, else it picks the largest group, + /// else it picks the newest group. std::sort( sorted.begin(), sorted.end(), [](Group const &left, Group const &right) -> bool { int left_retriable = left.IsRetriable() ? 0 : 1; int right_retriable = right.IsRetriable() ? 0 : 1; + if (left_retriable == right_retriable) { - return left.GetAssignedTaskTime() > right.GetAssignedTaskTime(); + if (left.GetAllWorkers().size() == right.GetAllWorkers().size()) { + return left.GetAssignedTaskTime() > right.GetAssignedTaskTime(); + } + return left.GetAllWorkers().size() > right.GetAllWorkers().size(); } return left_retriable < right_retriable; }); - bool should_retry = false; Group selected_group = sorted.front(); - for (Group group : sorted) { - if (group.GetAllWorkers().size() > 1) { - selected_group = group; - should_retry = true; - break; - } - } + bool should_retry = + selected_group.GetAllWorkers().size() > 1 && selected_group.IsRetriable(); auto worker_to_kill = selected_group.SelectWorkerToKill(); - RAY_LOG(INFO) << "Groups sorted based on the policy:\n" + RAY_LOG(INFO) << "Sorted list of tasks based on the policy:\n" << PolicyDebugString(sorted, system_memory); return std::make_pair(worker_to_kill, should_retry); @@ -98,8 +102,8 @@ std::string GroupByOwnerIdWorkerKillingPolicy::PolicyDebugString( std::stringstream result; int32_t group_index = 0; for (auto &group : groups) { - result << "Group (retriable: " << group.IsRetriable() - << ") (owner id: " << group.OwnerId() << ") (time counter: " + result << "Tasks (retriable: " << group.IsRetriable() + << ") (parent task id: " << group.OwnerId() << ") (Earliest assigned time: " << absl::FormatTime(group.GetAssignedTaskTime(), absl::UTCTimeZone()) << "):\n"; @@ -114,7 +118,7 @@ std::string GroupByOwnerIdWorkerKillingPolicy::PolicyDebugString( RAY_LOG_EVERY_MS(INFO, 60000) << "Can't find memory usage for PID, reporting zero. PID: " << pid; } - result << "Worker time counter " + result << "Task assigned time " << absl::FormatTime(worker->GetAssignedTaskTime(), absl::UTCTimeZone()) << " worker id " << worker->WorkerId() << " memory used " << used_memory << " task spec " @@ -135,7 +139,7 @@ std::string GroupByOwnerIdWorkerKillingPolicy::PolicyDebugString( return result.str(); } -TaskID Group::OwnerId() const { return owner_id_; } +const TaskID &Group::OwnerId() const { return owner_id_; } bool Group::IsRetriable() const { return retriable_; } diff --git a/src/ray/raylet/worker_killing_policy_group_by_owner.h b/src/ray/raylet/worker_killing_policy_group_by_owner.h index d274a1701553..c1f7049bda21 100644 --- a/src/ray/raylet/worker_killing_policy_group_by_owner.h +++ b/src/ray/raylet/worker_killing_policy_group_by_owner.h @@ -34,13 +34,13 @@ struct GroupKey { const TaskID &owner_id; }; -class Group { +struct Group { public: Group(const TaskID &owner_id, bool retriable) : owner_id_(owner_id), retriable_(retriable) {} /// The parent task id of the tasks belonging to this group - TaskID OwnerId() const; + const TaskID &OwnerId() const; /// Whether tasks in this group are retriable. bool IsRetriable() const; @@ -73,12 +73,13 @@ class Group { }; /// Groups task by its owner id. Non-retriable task (whether it be task or actor) forms -/// its own group. Prioritizes killing groups that are retriable first, then in LIFO -/// order, where each group's order is based on the time of its earliest submitted member. -/// Within the group it prioritizes killing task in LIFO order of the submission time. +/// its own group. Prioritizes killing groups that are retriable first, else it picks the +/// largest group, else it picks the newest group. The "age" of a group is based on the +/// time of its earliest submitted task. When a group is selected for killing it selects +/// the last submitted task. /// /// When selecting a worker / task to be killed, it will set the task to-be-killed to be -/// non-retriable if it is the last member of the group, and retriable otherwise. +/// non-retriable if it is the last member of the group, and is retriable otherwise. class GroupByOwnerIdWorkerKillingPolicy : public WorkerKillingPolicy { public: GroupByOwnerIdWorkerKillingPolicy(); diff --git a/src/ray/raylet/worker_killing_policy_group_by_owner_test.cc b/src/ray/raylet/worker_killing_policy_group_by_owner_test.cc index a8a51f95bf73..fcde36e326e2 100644 --- a/src/ray/raylet/worker_killing_policy_group_by_owner_test.cc +++ b/src/ray/raylet/worker_killing_policy_group_by_owner_test.cc @@ -124,8 +124,7 @@ TEST_F(WorkerKillingGroupByOwnerTest, TestNonRetriableBelongsToItsOwnGroupAndLIF ASSERT_EQ(retry, should_not_retry_); } -TEST_F(WorkerKillingGroupByOwnerTest, - TestGroupSortedByFirstSubmittedTaskAndSkipGroupWithOneTaskLeft) { +TEST_F(WorkerKillingGroupByOwnerTest, TestGroupSortedByGroupSizeThenFirstSubmittedTask) { auto first_group_owner_id = TaskID::FromRandom(job_id_); auto second_group_owner_id = TaskID::FromRandom(job_id_); @@ -151,8 +150,8 @@ TEST_F(WorkerKillingGroupByOwnerTest, std::vector, bool>> expected; expected.push_back(std::make_pair(fourth_submitted, should_retry_)); - expected.push_back(std::make_pair(third_submitted, should_retry_)); expected.push_back(std::make_pair(sixth_submitted, should_retry_)); + expected.push_back(std::make_pair(third_submitted, should_retry_)); expected.push_back(std::make_pair(fifth_submitted, should_retry_)); expected.push_back(std::make_pair(second_submitted, should_not_retry_)); expected.push_back(std::make_pair(first_submitted, should_not_retry_)); @@ -177,17 +176,13 @@ TEST_F(WorkerKillingGroupByOwnerTest, TestGroupSortedByRetriableLifo) { TaskID::FromRandom(job_id_), has_retry_); auto third_submitted = WorkerKillingGroupByOwnerTest::CreateActorCreationWorker( TaskID::FromRandom(job_id_), no_retry_); - auto fourth_submitted = WorkerKillingGroupByOwnerTest::CreateActorCreationWorker( - TaskID::FromRandom(job_id_), no_retry_); workers.push_back(first_submitted); workers.push_back(second_submitted); workers.push_back(third_submitted); - workers.push_back(fourth_submitted); std::vector, bool>> expected; expected.push_back(std::make_pair(second_submitted, should_not_retry_)); expected.push_back(std::make_pair(first_submitted, should_not_retry_)); - expected.push_back(std::make_pair(fourth_submitted, should_not_retry_)); expected.push_back(std::make_pair(third_submitted, should_not_retry_)); for (const auto &entry : expected) { @@ -202,6 +197,32 @@ TEST_F(WorkerKillingGroupByOwnerTest, TestGroupSortedByRetriableLifo) { } } +TEST_F(WorkerKillingGroupByOwnerTest, + TestMultipleNonRetriableTaskSameGroupAndNotRetried) { + std::vector> workers; + auto first_submitted = WorkerKillingGroupByOwnerTest::CreateActorCreationWorker( + TaskID::FromRandom(job_id_), no_retry_); + auto second_submitted = WorkerKillingGroupByOwnerTest::CreateTaskWorker( + TaskID::FromRandom(job_id_), no_retry_); + workers.push_back(first_submitted); + workers.push_back(second_submitted); + + std::vector, bool>> expected; + expected.push_back(std::make_pair(second_submitted, should_not_retry_)); + expected.push_back(std::make_pair(first_submitted, should_not_retry_)); + + for (const auto &entry : expected) { + auto worker_to_kill_and_should_retry_ = + worker_killing_policy_.SelectWorkerToKill(workers, MemorySnapshot()); + auto worker_to_kill = worker_to_kill_and_should_retry_.first; + bool retry = worker_to_kill_and_should_retry_.second; + ASSERT_EQ(worker_to_kill->WorkerId(), entry.first->WorkerId()); + ASSERT_EQ(retry, entry.second); + workers.erase(std::remove(workers.begin(), workers.end(), worker_to_kill), + workers.end()); + } +} + } // namespace raylet } // namespace ray From 3269f657f2a3970283966593ebc95a2af568820b Mon Sep 17 00:00:00 2001 From: Clarence Ng Date: Tue, 17 Jan 2023 13:53:22 -0800 Subject: [PATCH 14/16] [core] oom killer policy: group by owner id Signed-off-by: Clarence Ng --- src/ray/raylet/worker.h | 1 - 1 file changed, 1 deletion(-) diff --git a/src/ray/raylet/worker.h b/src/ray/raylet/worker.h index 8ac630fc3f64..f44c30fc4793 100644 --- a/src/ray/raylet/worker.h +++ b/src/ray/raylet/worker.h @@ -298,7 +298,6 @@ class Worker : public WorkerInterface { /// RayTask being assigned to this worker. RayTask assigned_task_; /// Time when the last task was assigned to this worker. - // std::chrono::steady_clock::time_point task_assign_time_; absl::Time task_assign_time_; /// If true, a RPC need to be sent to notify the worker about GCS restarting. bool notify_gcs_restarted_ = false; From 22111698b8eaf49673341f6cb8c1b2e60244d768 Mon Sep 17 00:00:00 2001 From: Clarence Ng Date: Wed, 18 Jan 2023 16:26:42 -0800 Subject: [PATCH 15/16] [core] oom killer policy: group by owner id Signed-off-by: Clarence Ng --- src/ray/common/ray_config_def.h | 1 + .../worker_killing_policy_group_by_owner.cc | 27 +++++++------------ .../worker_killing_policy_group_by_owner.h | 6 +++-- 3 files changed, 14 insertions(+), 20 deletions(-) diff --git a/src/ray/common/ray_config_def.h b/src/ray/common/ray_config_def.h index c8e71785e041..04373d1caf6d 100644 --- a/src/ray/common/ray_config_def.h +++ b/src/ray/common/ray_config_def.h @@ -99,6 +99,7 @@ RAY_CONFIG(uint64_t, task_failure_entry_ttl_ms, 15 * 60 * 1000) /// that is not related to running out of memory. Retries indefinitely if the value is -1. RAY_CONFIG(uint64_t, task_oom_retries, 15) +/// The worker killing policy to use, as defined in worker_killing_policy.h. RAY_CONFIG(std::string, worker_killing_policy, "retriable_lifo") /// If the raylet fails to get agent info, we will retry after this interval. diff --git a/src/ray/raylet/worker_killing_policy_group_by_owner.cc b/src/ray/raylet/worker_killing_policy_group_by_owner.cc index ab4fe8d8d886..e989dc40eafb 100644 --- a/src/ray/raylet/worker_killing_policy_group_by_owner.cc +++ b/src/ray/raylet/worker_killing_policy_group_by_owner.cc @@ -46,6 +46,8 @@ GroupByOwnerIdWorkerKillingPolicy::SelectWorkerToKill( std::unordered_map group_map; for (auto worker : workers) { bool retriable = worker->GetAssignedTask().GetTaskSpecification().IsRetriable(); + /// TODO(clarng): The Nil value is not stable / unique, and cannot be used + /// as the grouping key. Make Nil stable / unique. if (non_retriable_owner_id == TaskID::Nil()) { non_retriable_owner_id = TaskID::FromRandom(worker->GetAssignedJobId()); } @@ -73,7 +75,7 @@ GroupByOwnerIdWorkerKillingPolicy::SelectWorkerToKill( /// Prioritizes killing groups that are retriable, else it picks the largest group, /// else it picks the newest group. std::sort( - sorted.begin(), sorted.end(), [](Group const &left, Group const &right) -> bool { + sorted.begin(), sorted.end(), [](const Group &left, const Group &right) -> bool { int left_retriable = left.IsRetriable() ? 0 : 1; int right_retriable = right.IsRetriable() ? 0 : 1; @@ -141,20 +143,16 @@ std::string GroupByOwnerIdWorkerKillingPolicy::PolicyDebugString( const TaskID &Group::OwnerId() const { return owner_id_; } -bool Group::IsRetriable() const { return retriable_; } +const bool Group::IsRetriable() const { return retriable_; } -const absl::Time Group::GetAssignedTaskTime() const { return time_; } +const absl::Time Group::GetAssignedTaskTime() const { return earliest_task_time_; } void Group::AddToGroup(std::shared_ptr worker) { - if (worker->GetAssignedTaskTime() < time_) { - time_ = worker->GetAssignedTaskTime(); + if (worker->GetAssignedTaskTime() < earliest_task_time_) { + earliest_task_time_ = worker->GetAssignedTaskTime(); } bool retriable = worker->GetAssignedTask().GetTaskSpecification().IsRetriable(); - if (workers_.empty()) { - retriable_ = retriable; - } else { - RAY_CHECK_EQ(retriable_, retriable); - } + RAY_CHECK_EQ(retriable_, retriable); workers_.push_back(worker); } @@ -166,14 +164,7 @@ const std::shared_ptr Group::SelectWorkerToKill() const { sorted.end(), [](std::shared_ptr const &left, std::shared_ptr const &right) -> bool { - int left_retriable = - left->GetAssignedTask().GetTaskSpecification().IsRetriable() ? 0 : 1; - int right_retriable = - right->GetAssignedTask().GetTaskSpecification().IsRetriable() ? 0 : 1; - if (left_retriable == right_retriable) { - return left->GetAssignedTaskTime() > right->GetAssignedTaskTime(); - } - return left_retriable < right_retriable; + return left->GetAssignedTaskTime() > right->GetAssignedTaskTime(); }); return sorted.front(); diff --git a/src/ray/raylet/worker_killing_policy_group_by_owner.h b/src/ray/raylet/worker_killing_policy_group_by_owner.h index c1f7049bda21..a2c79fd60207 100644 --- a/src/ray/raylet/worker_killing_policy_group_by_owner.h +++ b/src/ray/raylet/worker_killing_policy_group_by_owner.h @@ -43,7 +43,7 @@ struct Group { const TaskID &OwnerId() const; /// Whether tasks in this group are retriable. - bool IsRetriable() const; + const bool IsRetriable() const; /// Gets the task time of the earliest task of this group, to be /// used for group priority. @@ -63,12 +63,14 @@ struct Group { std::vector> workers_; /// The earliest creation time of the tasks. - absl::Time time_ = absl::Now(); + absl::Time earliest_task_time_ = absl::Now(); /// The owner id shared by tasks of this group. + /// TODO(clarng): make this const and implement move / swap. TaskID owner_id_; /// Whether the tasks are retriable. + /// TODO(clarng): make this const and implement move / swap. bool retriable_; }; From afcb95172c867926bd3042a097ab3478a3ad4aa5 Mon Sep 17 00:00:00 2001 From: Clarence Ng Date: Thu, 19 Jan 2023 13:43:45 -0800 Subject: [PATCH 16/16] [core] oom killer policy: group by owner id Signed-off-by: Clarence Ng --- src/ray/raylet/worker_killing_policy_group_by_owner.cc | 5 ----- 1 file changed, 5 deletions(-) diff --git a/src/ray/raylet/worker_killing_policy_group_by_owner.cc b/src/ray/raylet/worker_killing_policy_group_by_owner.cc index e989dc40eafb..e455808ab3a1 100644 --- a/src/ray/raylet/worker_killing_policy_group_by_owner.cc +++ b/src/ray/raylet/worker_killing_policy_group_by_owner.cc @@ -46,11 +46,6 @@ GroupByOwnerIdWorkerKillingPolicy::SelectWorkerToKill( std::unordered_map group_map; for (auto worker : workers) { bool retriable = worker->GetAssignedTask().GetTaskSpecification().IsRetriable(); - /// TODO(clarng): The Nil value is not stable / unique, and cannot be used - /// as the grouping key. Make Nil stable / unique. - if (non_retriable_owner_id == TaskID::Nil()) { - non_retriable_owner_id = TaskID::FromRandom(worker->GetAssignedJobId()); - } TaskID owner_id = retriable ? worker->GetAssignedTask().GetTaskSpecification().ParentTaskId() : non_retriable_owner_id;