Skip to content

Commit

Permalink
add
Browse files Browse the repository at this point in the history
  • Loading branch information
scv119 committed Nov 20, 2022
1 parent 9c5b42d commit a4c27e2
Show file tree
Hide file tree
Showing 2 changed files with 14 additions and 8 deletions.
16 changes: 10 additions & 6 deletions src/ray/raylet/worker_pool.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1202,27 +1202,30 @@ void WorkerPool::PopWorker(const TaskSpecification &task_spec,
const int runtime_env_hash = task_spec.GetRuntimeEnvHash();
for (auto it = idle_of_all_languages_.rbegin(); it != idle_of_all_languages_.rend();
it++) {
RAY_LOG(INFO) << "0";
if (task_spec.GetLanguage() != it->first->GetLanguage() ||
it->first->GetAssignedJobId() != task_spec.JobId() ||
state.pending_disconnection_workers.count(it->first) > 0 || it->first->IsDead()) {
continue;
}
RAY_LOG(INFO) << "1";

// Skip if the dynamic_options doesn't match.
auto worker_process_ptr = LookupWorkerProcessInfo(it->first->GetStartupToken());
if (worker_process_ptr == nullptr ||
worker_process_ptr->dynamic_options != dynamic_options) {
if (LookupWorkerDynamicOptions(it->first->GetStartupToken()) != dynamic_options) {
continue;
}

RAY_LOG(INFO) << "2";
// These workers are exiting. So skip them.
if (pending_exit_idle_workers_.count(it->first->WorkerId())) {
continue;
}
RAY_LOG(INFO) << "3";
// Skip if the runtime env doesn't match.
if (runtime_env_hash != it->first->GetRuntimeEnvHash()) {
continue;
}
RAY_LOG(INFO) << "4";

state.idle.erase(it->first);
// We can't erase a reverse_iterator.
Expand Down Expand Up @@ -1627,15 +1630,16 @@ void WorkerPool::DeleteRuntimeEnvIfPossible(const std::string &serialized_runtim
}
}

const WorkerPool::WorkerProcessInfo *WorkerPool::LookupWorkerProcessInfo(
const std::vector<std::string> &WorkerPool::LookupWorkerDynamicOptions(
StartupToken token) const {
for (const auto &[lang, state] : states_by_lang_) {
auto it = state.worker_processes.find(token);
if (it != state.worker_processes.end()) {
return &(it->second);
return it->second.dynamic_options;
}
}
return nullptr;
static std::vector<std::string> kNoDynamicOptions;
return kNoDynamicOptions;
}

} // namespace raylet
Expand Down
6 changes: 4 additions & 2 deletions src/ray/raylet/worker_pool.h
Original file line number Diff line number Diff line change
Expand Up @@ -439,6 +439,10 @@ class WorkerPool : public WorkerPoolInterface, public IOWorkerPoolInterface {
std::shared_ptr<WorkerInterface> worker,
PopWorkerStatus status);

/// Look up worker's dynamic options by startup token.
/// TODO(scv119): replace dynamic options by runtime_env.
const std::vector<std::string> &LookupWorkerDynamicOptions(StartupToken token) const;

/// Gloabl startup token variable. Incremented once assigned
/// to a worker process and is added to
/// state.worker_processes.
Expand Down Expand Up @@ -697,8 +701,6 @@ class WorkerPool : public WorkerPoolInterface, public IOWorkerPoolInterface {
const std::string &serialized_runtime_env_context,
const WorkerPool::State &state) const;

const WorkerProcessInfo *LookupWorkerProcessInfo(StartupToken token) const;

/// For Process class for managing subprocesses (e.g. reaping zombies).
instrumented_io_context *io_service_;
/// Node ID of the current node.
Expand Down

0 comments on commit a4c27e2

Please sign in to comment.