-
Notifications
You must be signed in to change notification settings - Fork 5.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[runtime env] URI reference refactor #22828
Conversation
@edoakes @architkulkarni @rkooo567 This PR is not completed yet. But I have one question needed to discuss first. I saw that we also create runtime env in client server, but it doesn't delete the URI by RPC. Do we will keep this logic in future? Maybe there will be some corner cases which bring URI leakage(for example, client-server dies before the client job started) . I'm not sure the real effect. |
I believe that code path is a hack and we should remove it! |
@architkulkarni @edoakes @rkooo567 This PR is ready to review! I have updated some issues in the |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks great! Just some minor comments and questions.
-
Regarding the
HandleJobStarted
future work item in the PR description, I don't know if it's possible to make it only be called once per job -- @wuisawesome maybe you have some thoughts about this? -
I'm not too familiar with the logic of
num_registered_workers
andnum_starting_workers
so it's hard for me to review that part carefully. @SongGuyang maybe you could summarize the change, or someone else could review that part? -
It seems like the worker pool is now calling AddURIReference, which creates the runtime env as a side effect. The name might be a little confusing. Shouldn't these methods be called CreateRuntimeEnvIfNeeded and DeleteRuntimeEnvIfNeeded, and internally the agent adjusts the refcounts if needed? I don't feel strongly about this though, I'm fine with leaving it.
return unused_uris | ||
|
||
# Don't change URI reference for `client_server` because `client_server` doesn't | ||
# send the `DecreaseRuntimeEnvReference` RPC when the client exits. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Makes sense, thanks for catching this! I think the only downside of this for the user is that when they connect with ray.init()
with Ray Client, it will install the runtime environment, and then when the user runs their first task it will install the exact same runtime environment again, which could be slow.
I think the downside is not too severe.
async def _setup_runtime_env( | ||
serialized_runtime_env, serialized_allocated_resource_instances | ||
runtime_env, serialized_runtime_env, serialized_allocated_resource_instances |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems slightly odd to have both serialized_runtime_env
and runtime_env
args; do we need both?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I add runtime_env
here because I don't want to deserialize twice. Just an optimization for performance.
if r.status == agent_manager_pb2.AgentRpcStatus.AGENT_RPC_STATUS_OK: | ||
# specific_server.set_serialized_runtime_env(serialized_runtime_env) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we remove this comment?
// Kill all the workers that have been started but not registered. | ||
for (const auto &starting_worker : entry.second.starting_worker_processes) { | ||
procs_to_kill.insert(starting_worker.second.proc); | ||
// Kill all the worker processes. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we leave the NOTE
in, or does it no longer apply?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it applies. Here we kill all the worker processes when the worker pool is deconstructed. I didn't modify this logic. But I have redefine starting_worker_processes
to worker_processes
which means that we will store the alive processes all the way. So we can kill all the processes by this.
src/ray/raylet/worker_pool.cc
Outdated
@@ -596,28 +579,35 @@ void WorkerPool::MarkPortAsFree(int port) { | |||
} | |||
} | |||
|
|||
static bool RuntimeEnvNotEmpty(const std::string &serialized_runtime_env) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We currently have IsRuntimeEnvEmpty()
in runtime_env_common.h
, does it make sense to use that one?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good! I will reuse yours.
@@ -746,7 +734,7 @@ Status WorkerPool::RegisterDriver(const std::shared_ptr<WorkerInterface> &driver | |||
auto &state = GetStateForLanguage(driver->GetLanguage()); | |||
state.registered_drivers.insert(std::move(driver)); | |||
const auto job_id = driver->GetAssignedJobId(); | |||
all_jobs_[job_id] = job_config; | |||
HandleJobStarted(job_id, job_config); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't have much context here, do you know why HandleJobStarted wasn't called here previously? Was this a bug?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1 to Archit's comment. Either way, can we get a unit test to ensure the ref counting behavior is correct and stays correct?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
HandleJobStarted
only do two things: First, add job to all_jobs_
. Second, install runtime env eagerly if needed.
Before current PR:
HandleJobStarted
will be triggered by the publishing channel of job. So here, in RegisterDriver
, it's ok to only add job to all_jobs_
.
In current PR:
To guarantee the creating runtime env RPC only be called once, I have modified the logic of HandleJobStarted
. HandleJobStarted
will check if the job already been added. So, I should also use HandleJobStarted
here to make sure the creating runtime env RPC could be called.
src/ray/raylet/worker_pool.h
Outdated
@@ -461,11 +461,13 @@ class WorkerPool : public WorkerPoolInterface, public IOWorkerPoolInterface { | |||
}; | |||
|
|||
/// Some basic information about the starting worker process. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
/// Some basic information about the starting worker process. | |
/// Some basic information about the worker process. |
/// | | | | ||
/// +------------------------------------------------------+ | ||
/// | ||
/// Now, we can delete the runtime env resources safely. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Isn't there still value in keeping this? (Maybe moving it above the Increase/DecreaseURIReference methods)? It's true that the actual adding and deleting of the references is now happening in the agent, but the AddURIReference and DeleteURIReference calls are still coming from this file. So the information here is still accurate and useful, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, I can add a doc to Increase/DecreaseURIReference methods
. But it will be more simple which is entire different from this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@rkooo567 please help to review the part of |
@edoakes @rkooo567 what do you think about the RPC name? |
I think it is okay not to expose ref count to worker pool, so “ifNeeded” sounds good to me? |
And I will review it today! |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I had the initial pass! One of concerns I have is that gRPC requests are usually unordered afaik, and this could cause issues? (If Delete request comes before Create). Do you know if this is possible when the caller is always the same?
PIP = 3 | ||
CONDA = 4 | ||
|
||
def get_uris_from_runtime_env(self, runtime_env: RuntimeEnv): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why don't we just put all of them to a single class? I think it could be
class URIReferenceTable:
def add_uris(self, runtime_env):
uris = self. get_uris_from_runtime_env(runtime_env)
self. increase_reference_for_uris(uris)
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can also expose some gRPC endpoints to query ref count information in the future.
try: | ||
serialized_env = request.serialized_runtime_env | ||
runtime_env = RuntimeEnv.deserialize(serialized_env) | ||
uris = self.get_uris_from_runtime_env(runtime_env) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you put these two lines out of the try/except? They seem to be unrelated to the except block (it basically says it fails to parse runtime env?)
uris = self.get_uris_from_runtime_env(runtime_env)
if request.source_process not in self.reference_exclude_sources:
self.increase_reference_for_uris(uris)
So something like
try:
serialized_env = request.serialized_runtime_env
runtime_env = RuntimeEnv.deserialize(serialized_env)
except Exception as e:
self._logger.exception(
"[Increase] Failed to parse runtime env: " f"{serialized_env}"
)
return runtime_env_agent_pb2.CreateRuntimeEnvIfNeededReply(
status=agent_manager_pb2.AGENT_RPC_STATUS_FAILED,
error_message="".join(
traceback.format_exception(type(e), e, e.__traceback__)
),
)
uris = self.get_uris_from_runtime_env(runtime_env)
if request.source_process not in self.reference_exclude_sources:
self.increase_reference_for_uris(uris)
@@ -243,7 +318,9 @@ def setup_plugins(): | |||
"Runtime env already failed. " | |||
f"Env: {serialized_env}, err: {error_message}" | |||
) | |||
return runtime_env_agent_pb2.CreateRuntimeEnvReply( | |||
if request.source_process not in self.reference_exclude_sources: | |||
self.decrease_reference_for_uris(uris) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The python code with try/except seems to make it difficult to make correct atomic ref counting. Why don't we refactor code in this way?
increase_ref_count(runtime_env)
status = create_runtime_env(runtime_env)
if status != success:
decrease_ref_count(runtime_env)
reply(status)
|
||
try: | ||
runtime_env = RuntimeEnv.deserialize(request.serialized_runtime_env) | ||
uris = self.get_uris_from_runtime_env(runtime_env) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same comment as above?
@@ -197,24 +186,19 @@ void WorkerPool::update_worker_startup_token_counter() { | |||
worker_startup_token_counter_ += 1; | |||
} | |||
|
|||
void WorkerPool::AddStartingWorkerProcess( | |||
void WorkerPool::AddWorkerProcess( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What's the motivation for the name change?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Before current PR, we will delete the process when the worker finishes registration.
In current PR, I change this logic. The process will be stored all the way before the worker exits.
src/ray/raylet/worker_pool.h
Outdated
/// | ||
/// `CreateRuntimeEnvIfNeeded` means increasing the reference count for the runtime env | ||
/// and `DeleteRuntimeEnvIfNeeded` means decreasing the reference count. We increase or | ||
/// decrease runtime env reference in the cases below: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you mention this happens from the agent now?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
actual ref counting
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok
src/ray/raylet/worker_pool.cc
Outdated
if (worker_state.starting_worker_processes.count(worker_startup_token) > 0) { | ||
auto it = worker_state.worker_processes.find(worker_startup_token); | ||
if (it != worker_state.worker_processes.end() && | ||
it->second.num_starting_workers != 0) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can num_starting_workers
be negative?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it can't. But I also could change this code to it->second.num_starting_workers > 0
.
src/ray/raylet/worker_pool.cc
Outdated
auto &state = GetStateForLanguage(worker->GetLanguage()); | ||
auto it = state.worker_processes.find(worker->GetStartupToken()); | ||
if (it != state.worker_processes.end()) { | ||
it->second.num_registered_workers--; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it impossible the worker disconnected was a "starting workers"?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It looks like this code could be problematic if that's the case
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point. I think this could happen by low probability. I have deleted this field and add a field alive_started_workers
to record the started workers.
src/ray/raylet/worker_pool.cc
Outdated
CreateRuntimeEnv( | ||
RAY_LOG(DEBUG) << "[dedicated] Creating runtime env for task " | ||
<< task_spec.TaskId(); | ||
CreateRuntimeEnvIfNeeded( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually, why don't we call it GetOrCreateRuntimeEnv
?
src/ray/raylet/worker_pool.cc
Outdated
if (it != state.worker_processes.end()) { | ||
it->second.num_registered_workers--; | ||
if (it->second.num_registered_workers == 0 && it->second.num_starting_workers == 0) { | ||
DeleteRuntimeEnvIfNeeded(it->second.runtime_env_info.serialized_runtime_env()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For this API, my impression is the right name is
DeleteRuntimeEnvIfPossible
or
MarkRuntimeEnvUnused
Wdyt?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
CreateRuntimeEnvOrGet
and DeleteRuntimeEnvIfPossible
? I like symmetrical names. 😄
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm imo these two has a slightly different semantics to have symmetric names (create is get Or create , and delete it Delete or do nothing). But it is up to you.
TCP can guarantee the order, I'm not sure if grpc local buffer will break this(I guess it won't). But I think there is no consistency issue even though the Delete request comes before Create. If the Delete request comes first, agent will mark the runtime env unused and put it into the cache. When Create request comes, if the cache is not evicted, the runtime env will be marked used. Otherwise, agent will setup the runtime env again. If will bring extra workload, but not serious. |
@rkooo567 Take a look again? We will merge today if no new comments. |
|
||
def __init__( | ||
self, | ||
uris_parser: Callable[[RuntimeEnv], None], |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm this makes me think all logic here should be just within RuntimeEnvAgent
class. Having a separate class like this requires you to pass callbacks, which are weird.
Can you just remove this class and make them a private method of RuntimeEnvAgent
?
class RuntimeEnvAgent:
def _increase_ref_count():
def _decrease_ref_count():
self._unused_uris_callback = unused_uris_callback | ||
self._unused_runtime_env_callback = unused_runtime_env_callback | ||
# Don't change URI reference for `client_server` because `client_server` doesn't | ||
# send the `DeleteRuntimeEnvIfPossible` RPC when the client exits. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you also comment that here? Othewise, this part will be very confusing when somebody looks at it
@@ -216,7 +343,67 @@ def setup_plugins(): | |||
|
|||
return context | |||
|
|||
serialized_env = request.serialized_runtime_env | |||
# Create runtime env with retry times. This function won't raise exceptions. | |||
# Returns a tuple which contains result(bool), runtime env context(str), and |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you follow the proper Python-style docstring (https://docs.ray.io/en/master/ray-contribute/getting-involved.html#code-style)? Also, consider adding type annotation t
async def _create_runtime_env_with_retry(
runtime_env: RuntimeEnv,
serialized_runtime_env: str,
serialized_allocated_resource_instances: str
) -> Tuple[X, Y, Z]:
"""
Blah blah
Args:
Returns:
"""
^ last 3 comments ! |
@rkooo567 Do you remember that you comment here #22828 (comment) to ask me using a single class? I think the current class is clear and the callback is reasonable. If you insist, I will add a |
Yeah, I remember that. However, when I made a suggestion, I didn't know this would require the callback arguments to modify the parent's states. I don't agree this is reasonable & clear because you don't have any unit test on this class (so there's not much meaning having this additional layer) & this sort of callback is generally harder to read and used to avoid poor class structure. I really think we should avoid merging PRs just because we want to merge it before branch cut. This PR doesn't fix any existing user issues or critical bugs. There's no urgent features that have a hard dependency on it. I really don't understand why you rush on this. This is how we accumulate lots of tech debt to the codebase... If you really believe the new class is more reasonable, then go ahead. But please at least address these two comments. |
I have pushed new commit here alipay@57f1bbc but current PR don't change. |
#22828 (comment) is this a bug? I heard from some people that Github has some outage now https://www.githubstatus.com/ |
Oh, no😭 |
If it takes time to resolve it, I can probably just merge it and you can create a follow up PR for the fix |
@rkooo567 I really appreciate for your careful review. This help me a lot to enhance my code. Respect for your pay out. But I don't think I'm producing tech debt. I also try to address or feed back all the comments carefully, right? If my PR don't meet the requirement of merging, no problem, I will improve. About the class, I think the benefit is that it could make the responsibility clearly. |
Not so urgent. I will wait github recovered tomorrow. I believe it will not invalid more than one day. 😁 |
Add a reference table test b80289e. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks like it has conflicts?
Conflicts: dashboard/modules/runtime_env/runtime_env_agent.py python/ray/tests/test_runtime_env_2.py python/ray/util/client/server/proxier.py src/ray/protobuf/runtime_env_agent.proto src/ray/raylet/agent_manager.cc src/ray/raylet/worker_pool.cc
Yep, I merged master just now. Let's merge this if no tests broken. |
serve:test_cli flaky on master, Mac tests stalled |
@rkooo567 Is time to merge? |
Why are these changes needed?
CreateRuntimeEnvOrGet
andDeleteRuntimeEnvIfPossible
Future works
RuntimeEnvUris
fromRuntimeEnv
protobuf in current PR because gcs also uses those URIs to do GC by runtime_env_manager. We should also clear this.WorkerPool::HandleJobStarted
will be called multiple times for one job. So we should make sure this function is idempotent. Can we change this logic and make this function be called only once?Related issue number
#21695
Checks
scripts/format.sh
to lint the changes in this PR.