diff --git a/src/ray/raylet/worker_pool.cc b/src/ray/raylet/worker_pool.cc index bccceb3f138a..b5234380d3b9 100644 --- a/src/ray/raylet/worker_pool.cc +++ b/src/ray/raylet/worker_pool.cc @@ -1202,27 +1202,30 @@ void WorkerPool::PopWorker(const TaskSpecification &task_spec, const int runtime_env_hash = task_spec.GetRuntimeEnvHash(); for (auto it = idle_of_all_languages_.rbegin(); it != idle_of_all_languages_.rend(); it++) { + RAY_LOG(INFO) << "0"; if (task_spec.GetLanguage() != it->first->GetLanguage() || it->first->GetAssignedJobId() != task_spec.JobId() || state.pending_disconnection_workers.count(it->first) > 0 || it->first->IsDead()) { continue; } + RAY_LOG(INFO) << "1"; // Skip if the dynamic_options doesn't match. - auto worker_process_ptr = LookupWorkerProcessInfo(it->first->GetStartupToken()); - if (worker_process_ptr == nullptr || - worker_process_ptr->dynamic_options != dynamic_options) { + if (LookupWorkerDynamicOptions(it->first->GetStartupToken()) != dynamic_options) { continue; } + RAY_LOG(INFO) << "2"; // These workers are exiting. So skip them. if (pending_exit_idle_workers_.count(it->first->WorkerId())) { continue; } + RAY_LOG(INFO) << "3"; // Skip if the runtime env doesn't match. if (runtime_env_hash != it->first->GetRuntimeEnvHash()) { continue; } + RAY_LOG(INFO) << "4"; state.idle.erase(it->first); // We can't erase a reverse_iterator. @@ -1627,15 +1630,16 @@ void WorkerPool::DeleteRuntimeEnvIfPossible(const std::string &serialized_runtim } } -const WorkerPool::WorkerProcessInfo *WorkerPool::LookupWorkerProcessInfo( +const std::vector &WorkerPool::LookupWorkerDynamicOptions( StartupToken token) const { for (const auto &[lang, state] : states_by_lang_) { auto it = state.worker_processes.find(token); if (it != state.worker_processes.end()) { - return &(it->second); + return it->second.dynamic_options; } } - return nullptr; + static std::vector kNoDynamicOptions; + return kNoDynamicOptions; } } // namespace raylet diff --git a/src/ray/raylet/worker_pool.h b/src/ray/raylet/worker_pool.h index ff6efa5484b8..45178aea38ae 100644 --- a/src/ray/raylet/worker_pool.h +++ b/src/ray/raylet/worker_pool.h @@ -439,6 +439,10 @@ class WorkerPool : public WorkerPoolInterface, public IOWorkerPoolInterface { std::shared_ptr worker, PopWorkerStatus status); + /// Look up worker's dynamic options by startup token. + /// TODO(scv119): replace dynamic options by runtime_env. + const std::vector &LookupWorkerDynamicOptions(StartupToken token) const; + /// Gloabl startup token variable. Incremented once assigned /// to a worker process and is added to /// state.worker_processes. @@ -697,8 +701,6 @@ class WorkerPool : public WorkerPoolInterface, public IOWorkerPoolInterface { const std::string &serialized_runtime_env_context, const WorkerPool::State &state) const; - const WorkerProcessInfo *LookupWorkerProcessInfo(StartupToken token) const; - /// For Process class for managing subprocesses (e.g. reaping zombies). instrumented_io_context *io_service_; /// Node ID of the current node.