-
Notifications
You must be signed in to change notification settings - Fork 5.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[core] Async APIs for the New GcsClient. #46788
[core] Async APIs for the New GcsClient. #46788
Conversation
Signed-off-by: Ruiyang Wang <[email protected]>
Signed-off-by: Ruiyang Wang <[email protected]>
Signed-off-by: Ruiyang Wang <[email protected]>
self.inner = NewGcsClient.standalone( | ||
str(address), cluster_id=None, timeout_ms=timeout_ms | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ideally we can share the same underlying GCSClient?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes maybe in a next PR
Signed-off-by: Ruiyang Wang <[email protected]>
Signed-off-by: Ruiyang Wang <[email protected]>
…c-no-cpython Signed-off-by: Ruiyang Wang <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LG!
// However, Cython can't wrap a Python callable to a stateful C++ std::function. | ||
// | ||
// Fortunately, Cython can convert *pure* Cython functions to C++ function pointers. | ||
// Hence we make this C++ Functor `PyCallback` to wrap Python calls to C++ callbacks. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What do you want to say here? I feel these comments add confusion rather than clarity here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
reworded.
|
||
void operator()(Args &&...args) { | ||
PythonGilHolder gil; | ||
PyObject *result = converter(std::forward<Args>(args)...); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This result PyObject will be freed by Python, like there is no leak here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes the result is managed by CPython ref counts, we don't need to do anything here. A rule of thumb is: if you don't Py_INCREF
you don't need to Py_DECREF
.
python/ray/includes/gcs_client.pxi
Outdated
cpython.Py_INCREF(fut) | ||
return fut | ||
|
||
cdef void assign_and_decrement(result, void* fut_ptr): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
cdef void assign_and_decrement(result, void* fut_ptr): | |
cdef void assign_and_decrement_fut(result, void* fut_ptr): |
Please also update PR title and description. |
Signed-off-by: Ruiyang Wang <[email protected]>
Signed-off-by: Ruiyang Wang <[email protected]>
Signed-off-by: Ruiyang Wang <[email protected]>
OptionalItemPyCallback[int]( | ||
convert_optional_int, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In theory, this is never optional right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If status is not OK, the optional[int] would be nullopt. Otherwise, right it should never be optional.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
added assert
python/ray/includes/gcs_client.pxi
Outdated
check_status_timeout_as_rpc_error(status) | ||
except Exception as e: | ||
return None, e | ||
return b.value(), None |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I feel we should handle optional here. It's error-prone that this method is convert_optional_bool
but it assumes it's never optional?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Status InternalKVAccessor::Put(const std::string &ns,
const std::string &key,
const std::string &value,
bool overwrite,
const int64_t timeout_ms,
bool &added) {
std::promise<Status> ret_promise;
RAY_CHECK_OK(AsyncInternalKVPut(
ns,
key,
value,
overwrite,
timeout_ms,
[&ret_promise, &added](Status status, std::optional<int> added_num) {
added = static_cast<bool>(added_num.value_or(0));
ret_promise.set_value(status);
}));
return ret_promise.get_future().get();
}
For example, this handles the optional
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok I will assert for each optional
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
that optional check would never be used because its caller always populates:
client_impl_->GetGcsRpcClient().InternalKVPut(
req,
[callback](const Status &status, const rpc::InternalKVPutReply &reply) {
callback(status, reply.added_num());
},
timeout_ms);
but nonetheless I added assert for all optional converters
Signed-off-by: Ruiyang Wang <[email protected]>
Signed-off-by: Ruiyang Wang <[email protected]>
Implements async binding of C++ GcsClient in Python as NewGcsAioClient.
Previously we only have sync GcsClient bindings, ones that blocks on completion. To facilitate GcsAioClient we use python thread pool executor - one dedicated thread blocked for the sync call, whose underlying API is async. This is a big waste and we can do better.
The trick is to play the callback-to-async games wisely. Invoke a C++ async API with a python callback function; the callback serializes the reply data or exception, then switch to the python asyncio thread and complete a future. The future, in turn, is awaited by a converter function that does any python-side treatment (e.g. python protobuf deserialization, or converting to dict), then pass on to user code. The end result is an async method just like Python-native ones.
This PR adds the NewGcsAioClient and uses it as implementation of GcsAioClient by default. Can switch back to the OldGcsAioClient by RAY_USE_OLD_GCS_CLIENT=1.