-
Notifications
You must be signed in to change notification settings - Fork 5.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Client] chunked get requests #22100
Conversation
last_seen_chunk = chunk.chunk_id | ||
yield chunk | ||
return | ||
except grpc.RpcError as e: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Section from here and below is the same as _call_stub
@AmeerHajAli still taking a look. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks pretty good. If we can find a way to unify the code between ChunkCollector & _call_get_object
, that would be amazing (but nothing came to mind when I looked at it).
python/ray/util/client/dataclient.py
Outdated
elif chunk_id > self.last_seen_chunk + 1: | ||
# A chunk was skipped. This shouldn't happen in practice since | ||
# grpc guarantees that chunks will arrive in order. | ||
self.callback( | ||
RuntimeError( | ||
f"Received chunk {chunk_id} when we expected " | ||
f"{self.last_seen_chunk + 1} for request {response.req_id}" | ||
) | ||
) | ||
return True |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is the else
case here fine to ignore?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should be fine, the else case would be receiving a chunk which we've already seen which is fine to ignore (should already be appended to data)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe log a warning in this case?
@iycheng , @mwtian , can you please take a look? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Mostly looks good. I hope we can follow up with simplification of the async and sync get()
. Otherwise it will be a maintenance burden later on.
python/ray/util/client/dataclient.py
Outdated
elif chunk_id > self.last_seen_chunk + 1: | ||
# A chunk was skipped. This shouldn't happen in practice since | ||
# grpc guarantees that chunks will arrive in order. | ||
self.callback( | ||
RuntimeError( | ||
f"Received chunk {chunk_id} when we expected " | ||
f"{self.last_seen_chunk + 1} for request {response.req_id}" | ||
) | ||
) | ||
return True |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe log a warning in this case?
@@ -377,20 +379,37 @@ def _async_get_object( | |||
with disable_client_hook(): | |||
|
|||
def send_get_response(result: Any) -> None: | |||
"""Pushes a GetResponse to the main DataPath loop to send | |||
"""Pushes GetResponses to the main DataPath loop to send |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How much change would it be to consolidate sync and async get()
? Maintaining two implementations of get on client and server with chunking and reconnection seems like a burden later on.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I suspect it shouldn't be too bad, opened #22357 to track this
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Overall LGTM, to small comments before merging!
# calls ReleaseObject(). So self.asyncio_waiting_data | ||
# is accessed without holding self.lock. Holding the |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just to clarify, should ReleaseObject not be called while holding a lock?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I think it's to avoid a problem with object refs being cleaned up while holding locks causing deadlock.
@AmeerHajAli ready to merge |
Reverts #22100 linux://python/ray/tests:test_runtime_env_working_dir_remote_uri becomes very flaky after this PR.
Why are these changes needed? Switches GetObject from unary-unary to unary-streaming so that large objects can be streamed across multiple messages (currently hardcoded to 64MiB chunks). This will allow users to retrieve objects larger than 2GiB from a remote cluster. If the transfer is interrupted by a recoverable gRPC error (i.e. temporary disconnect), then the request will be retried starting from the first chunk that hasn't been received yet. Proto changes GetRequest's now have the field start_chunk_id, to indicate which chunk to start from (useful if the we have to retry a request after already receiving some chunks). GetResponses now have a chunk_id (0 indexed chunk of the serialized object), total_chunks (total number of chunks, used in async transfers to determine when all chunks have been received), and total_size (the total size of the object in bytes, used to raise user warnings if the object being retrieved is very large). Server changes Mainly just updating GetObject logic to yield chunks instead of returning Client changes At the moment, objects can be retrieved directly from the raylet servicer (ray.get) or asynchronously over the datapath (await some_remote_func.remote()). In both cases, the request will error if the chunk isn't valid (server side error) or if a chunk is received out of order (shouldn't happen in practice, since gRPC guarantees that messages in a stream either arrive in order or not at all). ray.get is fairly straightforward, and changes are mainly to accommodate yielding from the stub instead of taking the value directly. await some_remote_func.remote() is similar, but to keep things consistent with other async handling collecting the chunks is handled by a ChunkCollector, which wraps around the original callback.
Reverts ray-project#22100 linux://python/ray/tests:test_runtime_env_working_dir_remote_uri becomes very flaky after this PR.
Why are these changes needed?
Switches GetObject from unary-unary to unary-streaming so that large objects can be streamed across multiple messages (currently hardcoded to 64MiB chunks). This will allow users to retrieve objects larger than 2GiB from a remote cluster. If the transfer is interrupted by a recoverable gRPC error (i.e. temporary disconnect), then the request will be retried starting from the first chunk that hasn't been received yet.
Proto changes
GetRequest's now have the field start_chunk_id, to indicate which chunk to start from (useful if the we have to retry a request after already receiving some chunks). GetResponses now have a chunk_id (0 indexed chunk of the serialized object), total_chunks (total number of chunks, used in async transfers to determine when all chunks have been received), and total_size (the total size of the object in bytes, used to raise user warnings if the object being retrieved is very large).
Server changes
Mainly just updating GetObject logic to yield chunks instead of returning
Client changes
At the moment, objects can be retrieved directly from the raylet servicer (ray.get) or asynchronously over the datapath (await some_remote_func.remote()). In both cases, the request will error if the chunk isn't valid (server side error) or if a chunk is received out of order (shouldn't happen in practice, since gRPC guarantees that messages in a stream either arrive in order or not at all).
ray.get is fairly straightforward, and changes are mainly to accommodate yielding from the stub instead of taking the value directly.
await some_remote_func.remote() is similar, but to keep things consistent with other async handling collecting the chunks is handled by a ChunkCollector, which wraps around the original callback.
Related issue number
Checks
scripts/format.sh
to lint the changes in this PR.