From e056fe7c1115e5b9b165e4610450e8a0089f5b6b Mon Sep 17 00:00:00 2001 From: Ruiyang Wang Date: Mon, 22 Jul 2024 12:59:07 -0700 Subject: [PATCH 01/10] big Signed-off-by: Ruiyang Wang --- BUILD.bazel | 6 + python/ray/_private/gcs_aio_client.py | 56 +++- python/ray/includes/common.pxd | 82 ++++- python/ray/includes/gcs_client.pxi | 225 ++++++++++++++ src/ray/gcs/gcs_client/python_callbacks.h | 355 ++++++++++++++++++++++ 5 files changed, 705 insertions(+), 19 deletions(-) create mode 100644 src/ray/gcs/gcs_client/python_callbacks.h diff --git a/BUILD.bazel b/BUILD.bazel index a1988fec965a..0b2993e6876d 100644 --- a/BUILD.bazel +++ b/BUILD.bazel @@ -2361,6 +2361,11 @@ flatbuffer_cc_library( out_prefix = "ray/raylet/format/", ) +ray_cc_library( + name = "python_callbacks", + hdrs = ["src/ray/gcs/gcs_client/python_callbacks.h"], +) + pyx_library( name = "_raylet", srcs = glob([ @@ -2389,6 +2394,7 @@ pyx_library( linkstatic = 1, ), deps = [ + ":python_callbacks", "//:core_worker_lib", "//:exported_internal", "//:gcs_server_lib", diff --git a/python/ray/_private/gcs_aio_client.py b/python/ray/_private/gcs_aio_client.py index 0469e2ea1f46..6afd57ea93b1 100644 --- a/python/ray/_private/gcs_aio_client.py +++ b/python/ray/_private/gcs_aio_client.py @@ -1,7 +1,8 @@ +import os import logging from typing import Dict, List, Optional from concurrent.futures import ThreadPoolExecutor -from ray._raylet import GcsClient, JobID +from ray._raylet import GcsClient, NewGcsClient, JobID from ray.core.generated import ( gcs_pb2, ) @@ -14,14 +15,63 @@ # If the arg `executor` in GcsAioClient constructor is set, use it. # Otherwise if env var `GCS_AIO_CLIENT_DEFAULT_THREAD_COUNT` is set, use it. # Otherwise, use 5. +# This is only used for the OldGcsAioClient. GCS_AIO_CLIENT_DEFAULT_THREAD_COUNT = env_integer( "GCS_AIO_CLIENT_DEFAULT_THREAD_COUNT", 5 ) - logger = logging.getLogger(__name__) +class GcsAioClient: + """ + Async GCS client. + + This class is in transition to use the new C++ GcsClient binding. The old + PythonGcsClient binding is not deleted until we are confident that the new + binding is stable. + + Defaults to the new binding. If you want to use the old binding, please + set the environment variable `RAY_USE_OLD_GCS_CLIENT=1`. + """ + + def __new__(cls, *args, **kwargs): + use_old_client = os.getenv("RAY_USE_OLD_GCS_CLIENT") == "1" + logger.debug(f"Using {'old' if use_old_client else 'new'} GCS client") + if use_old_client: + return OldGcsAioClient(*args, **kwargs) + else: + return NewGcsAioClient(*args, **kwargs) + + +class NewGcsAioClient: + def __init__( + self, + address: str = None, + loop=None, + executor=None, + nums_reconnect_retry: int = 5, + ): + # See https://github.com/ray-project/ray/blob/d0b46eff9ddcf9ec7256dd3a6dda33e7fb7ced95/python/ray/_raylet.pyx#L2693 # noqa: E501 + timeout_ms = 1000 * (nums_reconnect_retry + 1) + self.inner = NewGcsClient.standalone( + str(address), cluster_id=None, timeout_ms=timeout_ms + ) + # Forwarded Methods. Not using __getattr__ because we want one fewer layer of + # indirection. + self.internal_kv_get = self.inner.async_internal_kv_get + self.internal_kv_multi_get = self.inner.async_internal_kv_multi_get + self.internal_kv_put = self.inner.async_internal_kv_put + self.internal_kv_del = self.inner.async_internal_kv_del + self.internal_kv_exists = self.inner.async_internal_kv_exists + self.internal_kv_keys = self.inner.async_internal_kv_keys + self.check_alive = self.inner.async_check_alive + self.get_all_job_info = self.inner.async_get_all_job_info + # Forwarded Properties. + self.address = self.inner.address + self.cluster_id = self.inner.cluster_id + + class AsyncProxy: def __init__(self, inner, loop, executor): self.inner = inner @@ -45,7 +95,7 @@ def __getattr__(self, name): return attr -class GcsAioClient: +class OldGcsAioClient: def __init__( self, loop=None, diff --git a/python/ray/includes/common.pxd b/python/ray/includes/common.pxd index 602d741a23b0..9a6371f35a2e 100644 --- a/python/ray/includes/common.pxd +++ b/python/ray/includes/common.pxd @@ -44,13 +44,16 @@ cdef extern from * namespace "polyfill" nogil: cdef extern from "ray/common/status.h" namespace "ray" nogil: - cdef cppclass StatusCode: + # TODO(ryw) in Cython 3.x we can directly use `cdef enum class CStatusCode` + cdef cppclass CStatusCode "ray::StatusCode": pass + cdef CStatusCode CStatusCode_OK "ray::StatusCode::OK" + c_bool operator==(CStatusCode lhs, CStatusCode rhs) cdef cppclass CRayStatus "ray::Status": CRayStatus() - CRayStatus(StatusCode code, const c_string &msg) - CRayStatus(StatusCode code, const c_string &msg, int rpc_code) + CRayStatus(CStatusCode code, const c_string &msg) + CRayStatus(CStatusCode code, const c_string &msg, int rpc_code) CRayStatus(const CRayStatus &s) @staticmethod @@ -135,7 +138,7 @@ cdef extern from "ray/common/status.h" namespace "ray" nogil: c_string ToString() c_string CodeAsString() - StatusCode code() + CStatusCode code() c_string message() int rpc_code() @@ -145,18 +148,6 @@ cdef extern from "ray/common/status.h" namespace "ray" nogil: cdef CRayStatus RayStatus_NotImplemented "Status::NotImplemented"() -cdef extern from "ray/common/status.h" namespace "ray::StatusCode" nogil: - cdef StatusCode StatusCode_OK "OK" - cdef StatusCode StatusCode_OutOfMemory "OutOfMemory" - cdef StatusCode StatusCode_KeyError "KeyError" - cdef StatusCode StatusCode_TypeError "TypeError" - cdef StatusCode StatusCode_Invalid "Invalid" - cdef StatusCode StatusCode_IOError "IOError" - cdef StatusCode StatusCode_UnknownError "UnknownError" - cdef StatusCode StatusCode_NotImplemented "NotImplemented" - cdef StatusCode StatusCode_RedisError "RedisError" - - cdef extern from "ray/common/id.h" namespace "ray" nogil: const CTaskID GenerateTaskId(const CJobID &job_id, const CTaskID &parent_task_id, @@ -376,6 +367,17 @@ cdef extern from "ray/core_worker/common.h" nogil: const CNodeID &GetSpilledNodeID() const const c_bool GetDidSpill() const +cdef extern from "ray/gcs/gcs_client/python_callbacks.h" namespace "ray" nogil: + # Each type needs default constructors asked by cython. + cdef cppclass PyDefaultCallback: + PyDefaultCallback() + PyDefaultCallback(object py_callback) + cdef cppclass PyMultiItemCallback[Item]: + PyMultiItemCallback() + PyMultiItemCallback(object py_callback) + cdef cppclass BoolConverter: + pass + cdef extern from "ray/gcs/gcs_client/accessor.h" nogil: cdef cppclass CActorInfoAccessor "ray::gcs::ActorInfoAccessor": pass @@ -385,12 +387,21 @@ cdef extern from "ray/gcs/gcs_client/accessor.h" nogil: c_vector[CJobTableData] &result, int64_t timeout_ms) + CRayStatus AsyncGetAll( + const PyDefaultCallback &callback, + int64_t timeout_ms) + cdef cppclass CNodeInfoAccessor "ray::gcs::NodeInfoAccessor": CRayStatus CheckAlive( const c_vector[c_string] &raylet_addresses, int64_t timeout_ms, c_vector[c_bool] &result) + CRayStatus AsyncCheckAlive( + const c_vector[c_string] &raylet_addresses, + int64_t timeout_ms, + const PyMultiItemCallback[BoolConverter] &callback) + CRayStatus DrainNodes( const c_vector[CNodeID] &node_ids, int64_t timeout_ms, @@ -445,6 +456,45 @@ cdef extern from "ray/gcs/gcs_client/accessor.h" nogil: int64_t timeout_ms, c_bool &exists) + CRayStatus AsyncInternalKVKeys( + const c_string &ns, + const c_string &prefix, + int64_t timeout_ms, + const PyDefaultCallback &callback) + + CRayStatus AsyncInternalKVGet( + const c_string &ns, + const c_string &key, + int64_t timeout_ms, + const PyDefaultCallback &callback) + + CRayStatus AsyncInternalKVMultiGet( + const c_string &ns, + const c_vector[c_string] &keys, + int64_t timeout_ms, + const PyDefaultCallback &callback) + + CRayStatus AsyncInternalKVPut( + const c_string &ns, + const c_string &key, + const c_string &value, + c_bool overwrite, + int64_t timeout_ms, + const PyDefaultCallback &callback) + + CRayStatus AsyncInternalKVExists( + const c_string &ns, + const c_string &key, + int64_t timeout_ms, + const PyDefaultCallback &callback) + + CRayStatus AsyncInternalKVDel( + const c_string &ns, + const c_string &key, + c_bool del_by_prefix, + int64_t timeout_ms, + const PyDefaultCallback &callback) + cdef cppclass CRuntimeEnvAccessor "ray::gcs::RuntimeEnvAccessor": CRayStatus PinRuntimeEnvUri( const c_string &uri, diff --git a/python/ray/includes/gcs_client.pxi b/python/ray/includes/gcs_client.pxi index 5552ace199aa..bd96cb216dcb 100644 --- a/python/ray/includes/gcs_client.pxi +++ b/python/ray/includes/gcs_client.pxi @@ -13,6 +13,30 @@ Binding of C++ ray::gcs::GcsClient. # out to a separate translation unit because we need to access the singleton thread. # # We need to best-effort import everything we need. +# +# Implementation Notes: +# +# Async API +# +# One challenge is that the C++ async API is callback-based, and the callbacks are +# invoked in the C++ threads. In `make_future_and_callback` we create a future and a +# callback, and the callback will fulfill the future in the event loop thread. The +# future is returned to Python to await, and the callback is passed to the C++ async +# API. Once C++ async API invokes the callback, the future is fulfilled in the Python +# event loop thread. +# +# Marshalling +# +# The C++ API returns ints, strings, `ray::Status` and C++ protobuf types. We need to +# convert them to Python types. In `python_callbacks.h` we define a series of converters +# with Cpython APIs: +# +# - bools, ints and strings are converted using `PyBool_FromLong` and alike. +# - `ray::Status` is marshalled to a 3-tuple and unmarshall it back to `CRayStatus` via +# `to_c_ray_status`. +# - C++ protobuf types are serialized them to bytes, passed to Python and deserialized +# in the Python `postprocess` functions. Later if we need performance for specific +# methods we can add a custom Converter in `python_callbacks.h`. from asyncio import Future from typing import List @@ -20,6 +44,11 @@ from ray.includes.common cimport ( CGcsClient, CGetAllResourceUsageReply, ConnectOnSingletonIoContext, + CStatusCode, + CStatusCode_OK, + PyDefaultCallback, + PyMultiItemCallback, + BoolConverter, ) from ray.core.generated import gcs_pb2 from cython.operator import dereference, postincrement @@ -170,6 +199,101 @@ cdef class NewGcsClient: ) return exists + ############################################################# + # Internal KV async methods + ############################################################# + + def async_internal_kv_get( + self, c_string key, namespace=None, timeout=None + ) -> Future[Optional[bytes]]: + cdef: + c_string ns = namespace or b"" + int64_t timeout_ms = round(1000 * timeout) if timeout else -1 + + def postprocess(tup: Tuple[StatusParts, Any]): + status_parts, val = tup + cdef CRayStatus c_ray_status = to_c_ray_status(status_parts) + if c_ray_status.IsNotFound(): + return None + check_status_timeout_as_rpc_error(c_ray_status) + return val + fut, cb = make_future_and_callback(postprocess=postprocess) + cdef PyDefaultCallback cy_callback = PyDefaultCallback(cb) + with nogil: + check_status_timeout_as_rpc_error( + self.inner.get().InternalKV().AsyncInternalKVGet( + ns, key, timeout_ms, cy_callback)) + return fut + + def async_internal_kv_multi_get( + self, keys: List[bytes], namespace=None, timeout=None + ) -> Future[Dict[bytes, bytes]]: + cdef: + c_string ns = namespace or b"" + int64_t timeout_ms = round(1000 * timeout) if timeout else -1 + c_vector[c_string] c_keys = [key for key in keys] + fut, cb = make_future_and_callback(postprocess=check_status_or_return) + cdef PyDefaultCallback cy_callback = PyDefaultCallback(cb) + with nogil: + check_status_timeout_as_rpc_error( + self.inner.get().InternalKV().AsyncInternalKVMultiGet( + ns, c_keys, timeout_ms, cy_callback)) + return fut + + def async_internal_kv_put( + self, c_string key, c_string value, c_bool overwrite=False, namespace=None, + timeout=None + ) -> Future[int]: + cdef: + c_string ns = namespace or b"" + int64_t timeout_ms = round(1000 * timeout) if timeout else -1 + fut, cb = make_future_and_callback(postprocess=check_status_or_return) + cdef PyDefaultCallback cy_callback = PyDefaultCallback(cb) + with nogil: + check_status_timeout_as_rpc_error( + self.inner.get().InternalKV().AsyncInternalKVPut( + ns, key, value, overwrite, timeout_ms, cy_callback)) + return fut + + def async_internal_kv_del(self, c_string key, c_bool del_by_prefix, + namespace=None, timeout=None) -> Future[int]: + cdef: + c_string ns = namespace or b"" + int64_t timeout_ms = round(1000 * timeout) if timeout else -1 + fut, cb = make_future_and_callback(postprocess=check_status_or_return) + cdef PyDefaultCallback cy_callback = PyDefaultCallback(cb) + with nogil: + check_status_timeout_as_rpc_error( + self.inner.get().InternalKV().AsyncInternalKVDel( + ns, key, del_by_prefix, timeout_ms, cy_callback)) + return fut + + def async_internal_kv_keys(self, c_string prefix, namespace=None, timeout=None + ) -> Future[List[bytes]]: + cdef: + c_string ns = namespace or b"" + int64_t timeout_ms = round(1000 * timeout) if timeout else -1 + fut, cb = make_future_and_callback(postprocess=check_status_or_return) + cdef PyDefaultCallback cy_callback = PyDefaultCallback(cb) + with nogil: + check_status_timeout_as_rpc_error( + self.inner.get().InternalKV().AsyncInternalKVKeys( + ns, prefix, timeout_ms, cy_callback)) + return fut + + def async_internal_kv_exists(self, c_string key, namespace=None, timeout=None + ) -> Future[bool]: + cdef: + c_string ns = namespace or b"" + int64_t timeout_ms = round(1000 * timeout) if timeout else -1 + fut, cb = make_future_and_callback(postprocess=check_status_or_return) + cdef PyDefaultCallback cy_callback = PyDefaultCallback(cb) + with nogil: + check_status_timeout_as_rpc_error( + self.inner.get().InternalKV().AsyncInternalKVExists( + ns, key, timeout_ms, cy_callback)) + return fut + ############################################################# # NodeInfo methods ############################################################# @@ -186,6 +310,21 @@ cdef class NewGcsClient: ) return [result for result in results] + def async_check_alive( + self, node_ips: List[bytes], timeout: Optional[float] = None + ) -> Future[List[bool]]: + cdef: + int64_t timeout_ms = round(1000 * timeout) if timeout else -1 + c_vector[c_string] c_node_ips = [ip for ip in node_ips] + fut, cb = make_future_and_callback(postprocess=check_status_or_return) + cdef PyMultiItemCallback[BoolConverter] cy_callback = \ + PyMultiItemCallback[BoolConverter](cb) + with nogil: + check_status_timeout_as_rpc_error( + self.inner.get().Nodes().AsyncCheckAlive( + c_node_ips, timeout_ms, cy_callback)) + return fut + def drain_nodes( self, node_ids: List[bytes], timeout: Optional[float] = None ) -> List[bytes]: @@ -263,6 +402,26 @@ cdef class NewGcsClient: ret[JobID.from_binary(proto.job_id)] = proto return ret + def async_get_all_job_info( + self, timeout: Optional[float] = None + ) -> Future[Dict[str, gcs_pb2.JobTableData]]: + cdef int64_t timeout_ms = round(1000 * timeout) if timeout else -1 + + def postprocess(binary): + list_of_bytes: List[bytes] = check_status_or_return(binary) + job_table_data = {} + for b in list_of_bytes: + proto = gcs_pb2.JobTableData() + proto.ParseFromString(b) + job_table_data[proto.job_id] = proto + return job_table_data + fut, cb = make_future_and_callback(postprocess=postprocess) + cdef PyDefaultCallback cy_callback = PyDefaultCallback(cb) + with nogil: + check_status_timeout_as_rpc_error( + self.inner.get().Jobs().AsyncGetAll(cy_callback, timeout_ms)) + return fut + ############################################################# # Runtime Env methods ############################################################# @@ -360,3 +519,69 @@ cdef class NewGcsClient: rejection_reason_message)) return (is_accepted, rejection_reason_message.decode()) + + +# Util functions for async handling + +# Ideally we want to pass CRayStatus around. However it's not easy to wrap a +# `ray::Status` to a `PythonObject*` so we marshall it to a 3-tuple like this. It can be +# unmarshalled to CRayStatus with `to_c_ray_status`. +StatusParts = Tuple[int, str, int] + +cdef CRayStatus to_c_ray_status(tup: StatusParts): + cdef: + uint8_t code = tup[0] + CStatusCode status_code = (code) + c_string msg = tup[1] + int rpc_code = tup[2] + CRayStatus s + if status_code == CStatusCode_OK: + return CRayStatus.OK() + s = CRayStatus(status_code, msg, rpc_code) + return s + + +def check_status_parts(parts: StatusParts): + check_status_timeout_as_rpc_error(to_c_ray_status(parts)) + + +def check_status_or_return(tup: Tuple[StatusParts, Any]): + status_parts, val = tup + check_status_parts(status_parts) + return val + + +cdef make_future_and_callback(postprocess=None): + """ + Prepares a series of async call and returns (future, callback). + In runtime it's in this order: + - Async API invoked. + - if it returns non-OK, the async call raises. + - Async API invokes `callback`, in the C++ thread + - `callback` sends the result to the event loop thread and fulfill `fut`. + - `run_postprocess` awaits `fut`, invokes `postprocess` and fulfill `fut2`. + - `fut2` is what we return to the user. + + Params: + `postprocess` is a sync function that returns transformed value, may raise. + """ + loop = asyncio.get_event_loop() + fut = loop.create_future() + + def callback(result, exc): + # May run in in C++ thread + if fut.cancelled(): + return + if exc is not None: + loop.call_soon_threadsafe(fut.set_exception, exc) + else: + loop.call_soon_threadsafe(fut.set_result, result) + + async def run_postprocess(fut, postprocess): + result = await fut + if postprocess is None: + return result + else: + return postprocess(result) + + return run_postprocess(fut, postprocess), callback diff --git a/src/ray/gcs/gcs_client/python_callbacks.h b/src/ray/gcs/gcs_client/python_callbacks.h new file mode 100644 index 000000000000..4d340511055a --- /dev/null +++ b/src/ray/gcs/gcs_client/python_callbacks.h @@ -0,0 +1,355 @@ +// Copyright 2024 The Ray Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#include + +#include +#include +#include +#include +#include + +#include "absl/container/flat_hash_map.h" +#include "ray/common/status.h" +#include "ray/util/logging.h" + +namespace ray { + +//// Converters +// Converts C++ types to PyObject*. +// None of the converters hold the GIL, but all of them requires the GIL to be held. +// This means the caller should hold the GIL before calling these functions. +// +// A Converter is a functor that takes a C++ type and returns a PyObject*. +// The conversion may fail, in which case the converter should set an exception and return +// nullptr. +// +// By default you can use `DefaultConverter::convert` to convert any type. If you need +// special handling you can compose out your own. +class BytesConverter { + // Serializes the message to a string. Returns false if the serialization fails. + // template + // static bool serialize(const T &message, std::string &result); + + // Specialization for types with a SerializeToString method + template + static bool serialize(const Message &message, + std::string &result, + typename std::enable_if::value>::type * = 0) { + return message.SerializeToString(&result); + } + + // Specialization for types with a Binary method, i.e. `BaseID`s + // Never fails. + template + static bool serialize( + const ID &id, + std::string &result, + typename std::enable_if< + std::is_member_function_pointer::value>::type * = 0) { + result = id.Binary(); + return true; + } + + public: + template + static PyObject *convert(const T &arg) { + std::string serialized_message; + if (serialize(arg, serialized_message)) { + return PyBytes_FromStringAndSize(serialized_message.data(), + serialized_message.size()); + } else { + PyErr_SetString(PyExc_ValueError, "Failed to serialize message."); + Py_RETURN_NONE; + } + } + + static PyObject *convert(const std::string &arg) { + return PyBytes_FromStringAndSize(arg.data(), arg.size()); + } +}; + +class BoolConverter { + public: + static PyObject *convert(bool arg) { return PyBool_FromLong(arg); } +}; + +class IntConverter { + public: + static PyObject *convert(int arg) { return PyLong_FromLong(arg); } +}; + +template +class OptionalConverter { + public: + template + static PyObject *convert(const boost::optional &arg) { + if (arg) { + return Inner::convert(*arg); + } else { + Py_RETURN_NONE; + } + } +}; + +template +class VectorConverter { + public: + template + static PyObject *convert(const std::vector &arg) { + PyObject *list = PyList_New(arg.size()); + for (size_t i = 0; i < arg.size(); i++) { + PyObject *item = Inner::convert(arg[i]); + if (item == nullptr) { + // Failed to convert an item. Free the list and all items within. + Py_DECREF(list); + return nullptr; + } + PyList_SetItem(list, i, Inner::convert(arg[i])); + } + return list; + } +}; + +// Returns a Python tuple of two elements. +template +class PairConverter { + public: + template + static PyObject *convert(const T &left, const U &right) { + PyObject *result = PyTuple_New(2); + PyObject *left_obj = Left::convert(left); + if (left_obj == nullptr) { + Py_DECREF(result); + return nullptr; + } + PyTuple_SetItem(result, 0, left_obj); + PyObject *right_obj = Right::convert(right); + if (right_obj == nullptr) { + Py_DECREF(result); + return nullptr; + } + PyTuple_SetItem(result, 1, right_obj); + return result; + } +}; + +template +class MapConverter { + public: + template + static PyObject *convert(const MapType &arg) { + PyObject *dict = PyDict_New(); + for (const auto &pair : arg) { + PyObject *key = KeyConverter::convert(pair.first); + if (key == nullptr) { + Py_DECREF(dict); + return nullptr; + } + PyObject *value = ValueConverter::convert(pair.second); + if (value == nullptr) { + Py_DECREF(key); + Py_DECREF(dict); + return nullptr; + } + if (PyDict_SetItem(dict, key, value) < 0) { + Py_DECREF(key); + Py_DECREF(value); + Py_DECREF(dict); + return nullptr; + } + Py_XDECREF(key); + Py_XDECREF(value); + } + return dict; + } +}; + +// Converts ray::Status to Tuple[StatusCode, error_message, rpc_code] +class StatusConverter { + public: + static PyObject *convert(const ray::Status &status) { + static_assert(std::is_same_v>, + "StatusCode underlying type should be char."); + return PyTuple_Pack(3, + IntConverter::convert(static_cast(status.code())), + BytesConverter::convert(status.message()), + IntConverter::convert(status.rpc_code())); + } +}; + +// Default converter, converts all types implemented above. +// Resolution: +// - single bool, int, status: BoolConverter, IntConverter, StatusConverter +// - optional: OptionalConverter +// - vector: VectorConverter +// - single generic argument: BytesConverter +// - 2 args (T, U): PairConverter +// - map: MapConverter (we can't do generics over MapType for +// collision w/ BytesConverter, so we specialize for std::unordered_map and +// absl::flat_hash_map) +class DefaultConverter { + public: + static PyObject *convert(bool arg) { return BoolConverter::convert(arg); } + static PyObject *convert(int arg) { return IntConverter::convert(arg); } + static PyObject *convert(const Status &arg) { return StatusConverter::convert(arg); } + + template + static PyObject *convert(const T &arg) { + return BytesConverter::convert(arg); + } + template + static PyObject *convert(const boost::optional &arg) { + return OptionalConverter::convert(arg); + } + template + static PyObject *convert(const std::vector &arg) { + return VectorConverter::convert(arg); + } + template + static PyObject *convert(const T &left, const U &right) { + return PairConverter::convert(left, right); + } + template + static PyObject *convert(const std::unordered_map &arg) { + return MapConverter::convert(arg); + } + template + static PyObject *convert(const absl::flat_hash_map &arg) { + return MapConverter::convert(arg); + } +}; + +// Wraps a Python `Callable[[T, Exception], None]` into a C++ `std::function`. +// This is a base class for all the callbacks, with subclass handling the conversion. +// The base class handles: +// - GIL management +// - PyObject Ref counting +// - Exception handling. +// +// `Converter`: a functor that converts U to owned PyObject*. +// +// TODO: For all these we also need to handle exc. We do this by giving the py_callable +// another arg so it becomes Callable[[T, Exception], None]. The exception is a +// 3-tuple of (type, value, tb). +// TODO: Subscribe need iterator/generator. +template +class PyCallback { + private: + class PythonGilHolder { + public: + PythonGilHolder() : state_(PyGILState_Ensure()) {} + ~PythonGilHolder() { PyGILState_Release(state_); } + + private: + PyGILState_STATE state_; + }; + + public: + // Ref counted Python object. + // This callback class is copyable, and as a std::function it's indeed copied around. + // But we don't want to hold the GIL and do ref counting so many times, so we use a + // shared_ptr to manage the ref count for us, and only call Py_XINCREF once. + // + // Note this ref counted is only from C++ side. If this ref count goes to 0, we only + // Issue 1 decref, which does not necessarily free the object. + std::shared_ptr py_callable; + + // Needed by Cython to place a placeholder object. + PyCallback() {} + + PyCallback(PyObject *callable) { + if (callable == nullptr) { + py_callable = nullptr; + return; + } + PythonGilHolder gil; + Py_XINCREF(callable); + + py_callable = std::shared_ptr(callable, [](PyObject *obj) { + PythonGilHolder gil; + Py_XDECREF(obj); + }); + } + + // Typically Args is just a single argument, but we allow multiple arguments for e.g. + // (Status, boost::optional). + // + // Converts the arguments to a Python Object and may raise exceptions. + // Invokes the Python callable with (converted_arg, None), or (None, exception). + // + // The callback should not return anything and should not raise exceptions. If it + // raises, we catch it and log it. + template + void operator()(const Args &...args) { + if (py_callable == nullptr) { + return; + } + + // Hold the GIL. + PythonGilHolder gil; + PyObject *arg_obj = Converter::convert(args...); + if (arg_obj == nullptr) { + // Failed to convert the arguments. The exception is set by the converter. + PyObject *exc_obj = PyErr_Occurred(); + if (exc_obj == nullptr) { + // The converter didn't set an exception? + RAY_LOG(WARNING) << "Failed to convert arguments, but no exception set."; + PyErr_SetString(PyExc_ValueError, "Failed to convert arguments."); + exc_obj = PyErr_Occurred(); + } + PyObject *result = + PyObject_CallFunctionObjArgs(py_callable.get(), Py_None, exc_obj, NULL); + Py_XDECREF(result); + } else { + PyObject *result = + PyObject_CallFunctionObjArgs(py_callable.get(), arg_obj, Py_None, NULL); + Py_XDECREF(arg_obj); + Py_XDECREF(result); + } + if (PyErr_Occurred()) { + // Our binding code raised exceptions. Not much we can do here. Print it out. + PyObject *ptype, *pvalue, *ptraceback; + PyErr_Fetch(&ptype, &pvalue, &ptraceback); + PyErr_NormalizeException(&ptype, &pvalue, &ptraceback); + PyObject *str_exc = PyObject_Str(pvalue); + const char *exc_str = PyUnicode_AsUTF8(str_exc); + + RAY_LOG(ERROR) << "Python exception in cpython binding callback: " << exc_str; + + // Clean up + Py_XDECREF(ptype); + Py_XDECREF(pvalue); + Py_XDECREF(ptraceback); + Py_XDECREF(str_exc); + + PyErr_Clear(); + } + } +}; + +// Concrete callback types. +// Most types are using the DefaultConverter, but we allow specialization for some types. +using PyDefaultCallback = PyCallback; +// Specialization for a pair of (Status, Vector). +// We need this because `MultiItemCallback` uses (Status, std::vector&&) not const ref. +// and C++ can't deduce it well (will implicit convert the vector&& to bool) +template +using PyMultiItemCallback = + PyCallback>>; + +} // namespace ray From c3557ebb8cb7d49bf1e71ee4db69c58e21cd794a Mon Sep 17 00:00:00 2001 From: Ruiyang Wang Date: Wed, 24 Jul 2024 12:49:32 -0700 Subject: [PATCH 02/10] remove Cpython code, rely on Cython to do conversion Signed-off-by: Ruiyang Wang --- BUILD.bazel | 15 +- python/ray/includes/common.pxd | 63 ++-- python/ray/includes/gcs_client.pxi | 274 +++++++++------- src/ray/gcs/gcs_client/python_callbacks.h | 369 ++++------------------ 4 files changed, 270 insertions(+), 451 deletions(-) diff --git a/BUILD.bazel b/BUILD.bazel index 0b2993e6876d..b6a17084f459 100644 --- a/BUILD.bazel +++ b/BUILD.bazel @@ -7,14 +7,14 @@ # If you would like to help with the move in your PR, please use `git mv` so that the history of the file is retained. load("@bazel_skylib//rules:common_settings.bzl", "bool_flag") -load("@rules_proto//proto:defs.bzl", "proto_library") -load("@rules_python//python:defs.bzl", "py_library", "py_runtime", "py_runtime_pair") -load("@rules_cc//cc:defs.bzl", "cc_proto_library") +load("@com_github_google_flatbuffers//:build_defs.bzl", "flatbuffer_cc_library") load("@com_github_grpc_grpc//bazel:cc_grpc_library.bzl", "cc_grpc_library") load("@com_github_grpc_grpc//bazel:cython_library.bzl", "pyx_library") -load("@com_github_google_flatbuffers//:build_defs.bzl", "flatbuffer_cc_library") -load("//bazel:ray.bzl", "COPTS", "PYX_COPTS", "PYX_SRCS", "copy_to_workspace", "ray_cc_binary", "ray_cc_library", "ray_cc_test") load("@python3_9//:defs.bzl", python39 = "interpreter") +load("@rules_cc//cc:defs.bzl", "cc_proto_library") +load("@rules_proto//proto:defs.bzl", "proto_library") +load("@rules_python//python:defs.bzl", "py_library", "py_runtime", "py_runtime_pair") +load("//bazel:ray.bzl", "COPTS", "PYX_COPTS", "PYX_SRCS", "copy_to_workspace", "ray_cc_binary", "ray_cc_library", "ray_cc_test") package( default_visibility = ["//visibility:public"], @@ -2363,7 +2363,10 @@ flatbuffer_cc_library( ray_cc_library( name = "python_callbacks", - hdrs = ["src/ray/gcs/gcs_client/python_callbacks.h"], + hdrs = [ + "src/ray/gcs/callback.h", + "src/ray/gcs/gcs_client/python_callbacks.h", + ], ) pyx_library( diff --git a/python/ray/includes/common.pxd b/python/ray/includes/common.pxd index 9a6371f35a2e..6fecfc10e68b 100644 --- a/python/ray/includes/common.pxd +++ b/python/ray/includes/common.pxd @@ -6,6 +6,7 @@ from libc.stdint cimport uint8_t, int32_t, uint64_t, int64_t, uint32_t from libcpp.unordered_map cimport unordered_map from libcpp.vector cimport vector as c_vector from libcpp.pair cimport pair as c_pair +from libcpp.functional cimport function from ray.includes.optional cimport ( optional, ) @@ -367,16 +368,42 @@ cdef extern from "ray/core_worker/common.h" nogil: const CNodeID &GetSpilledNodeID() const const c_bool GetDidSpill() const -cdef extern from "ray/gcs/gcs_client/python_callbacks.h" namespace "ray" nogil: - # Each type needs default constructors asked by cython. - cdef cppclass PyDefaultCallback: - PyDefaultCallback() - PyDefaultCallback(object py_callback) - cdef cppclass PyMultiItemCallback[Item]: - PyMultiItemCallback() - PyMultiItemCallback(object py_callback) - cdef cppclass BoolConverter: - pass +cdef extern from "ray/gcs/gcs_client/python_callbacks.h" namespace "ray::gcs": + # Facility for async bindings. C++ APIs need a callback (`std::function`), + # and in the callback we want to do type conversion and complete the Python future. + # However, Cython can't wrap a Python callable to a stateful C++ std::function. + + # Fortunately, Cython can convert *pure* Cython functions to C++ function pointers. + # Hence we make this C++ Functor `CpsHandler` to wrap Python calls to C++ callbacks. + + # Different APIs have different type signatures, but the code of completing the future + # is the same. So we ask 2 Cython function pointers: `Handler` and `Continuation`. + # `Handler` is unique for each API, converting C++ types to Python types. `Continuation` + # is shared by all APIs, completing the Python future. + + # One issue is the `Continuation` have to be stateless, so we need to keep the Future + # in the functor. But we don't want to expose the Future to C++ too much, so we keep + # it as a void*. For that, we Py_INCREF the Future in the functor and Py_DECREF it + # after the completion. + + # On C++ async API calling: + # 1. Create a Future. + # 2. Creates a `CpsHandler` functor with `Handler` and `Continuation` and the Future. + # 3. Invokes the async API with the functor. + + # On C++ async API completion: + # 1. The CpsHandler functor is called. It acquires GIL and: + # 2. The functor calls the Cython function `Handler` with C++ types. It returns + # # `Tuple[result, exception]`. + # 3. The functor calls the Cython function `Continuation` with the tuple and the + # # Future (as void*). It completes the Python future with the result or with the + # # exception. + + cdef cppclass MultiItemCpsHandler[T]: + MultiItemCpsHandler(object (*)(CRayStatus, c_vector[T] &&) , void (object, void*), void*) nogil + + cdef cppclass OptionalItemCpsHandler[T]: + OptionalItemCpsHandler(object (*)(CRayStatus, const optional[T]&) , void (object, void*), void*) nogil cdef extern from "ray/gcs/gcs_client/accessor.h" nogil: cdef cppclass CActorInfoAccessor "ray::gcs::ActorInfoAccessor": @@ -388,7 +415,7 @@ cdef extern from "ray/gcs/gcs_client/accessor.h" nogil: int64_t timeout_ms) CRayStatus AsyncGetAll( - const PyDefaultCallback &callback, + const MultiItemCpsHandler[CJobTableData] &callback, int64_t timeout_ms) cdef cppclass CNodeInfoAccessor "ray::gcs::NodeInfoAccessor": @@ -400,7 +427,7 @@ cdef extern from "ray/gcs/gcs_client/accessor.h" nogil: CRayStatus AsyncCheckAlive( const c_vector[c_string] &raylet_addresses, int64_t timeout_ms, - const PyMultiItemCallback[BoolConverter] &callback) + const MultiItemCpsHandler[c_bool] &callback) CRayStatus DrainNodes( const c_vector[CNodeID] &node_ids, @@ -460,19 +487,19 @@ cdef extern from "ray/gcs/gcs_client/accessor.h" nogil: const c_string &ns, const c_string &prefix, int64_t timeout_ms, - const PyDefaultCallback &callback) + const OptionalItemCpsHandler[c_vector[c_string]] &callback) CRayStatus AsyncInternalKVGet( const c_string &ns, const c_string &key, int64_t timeout_ms, - const PyDefaultCallback &callback) + const OptionalItemCpsHandler[c_string] &callback) CRayStatus AsyncInternalKVMultiGet( const c_string &ns, const c_vector[c_string] &keys, int64_t timeout_ms, - const PyDefaultCallback &callback) + const OptionalItemCpsHandler[unordered_map[c_string, c_string]] &callback) CRayStatus AsyncInternalKVPut( const c_string &ns, @@ -480,20 +507,20 @@ cdef extern from "ray/gcs/gcs_client/accessor.h" nogil: const c_string &value, c_bool overwrite, int64_t timeout_ms, - const PyDefaultCallback &callback) + const OptionalItemCpsHandler[int] &callback) CRayStatus AsyncInternalKVExists( const c_string &ns, const c_string &key, int64_t timeout_ms, - const PyDefaultCallback &callback) + const OptionalItemCpsHandler[c_bool] &callback) CRayStatus AsyncInternalKVDel( const c_string &ns, const c_string &key, c_bool del_by_prefix, int64_t timeout_ms, - const PyDefaultCallback &callback) + const OptionalItemCpsHandler[int] &callback) cdef cppclass CRuntimeEnvAccessor "ray::gcs::RuntimeEnvAccessor": CRayStatus PinRuntimeEnvUri( diff --git a/python/ray/includes/gcs_client.pxi b/python/ray/includes/gcs_client.pxi index bd96cb216dcb..42c03d65008c 100644 --- a/python/ray/includes/gcs_client.pxi +++ b/python/ray/includes/gcs_client.pxi @@ -39,19 +39,23 @@ Binding of C++ ray::gcs::GcsClient. # methods we can add a custom Converter in `python_callbacks.h`. from asyncio import Future -from typing import List +from typing import List, Dict, Any, Tuple, Optional, Callable +from libcpp.utility cimport move +import concurrent.futures from ray.includes.common cimport ( CGcsClient, CGetAllResourceUsageReply, ConnectOnSingletonIoContext, CStatusCode, CStatusCode_OK, - PyDefaultCallback, - PyMultiItemCallback, - BoolConverter, + MultiItemCpsHandler, + OptionalItemCpsHandler, ) +from ray.includes.optional cimport optional from ray.core.generated import gcs_pb2 from cython.operator import dereference, postincrement +cimport cpython + cdef class NewGcsClient: cdef: @@ -209,21 +213,16 @@ cdef class NewGcsClient: cdef: c_string ns = namespace or b"" int64_t timeout_ms = round(1000 * timeout) if timeout else -1 - - def postprocess(tup: Tuple[StatusParts, Any]): - status_parts, val = tup - cdef CRayStatus c_ray_status = to_c_ray_status(status_parts) - if c_ray_status.IsNotFound(): - return None - check_status_timeout_as_rpc_error(c_ray_status) - return val - fut, cb = make_future_and_callback(postprocess=postprocess) - cdef PyDefaultCallback cy_callback = PyDefaultCallback(cb) + void* fut_ptr = make_fut_ptr() with nogil: check_status_timeout_as_rpc_error( self.inner.get().InternalKV().AsyncInternalKVGet( - ns, key, timeout_ms, cy_callback)) - return fut + ns, key, timeout_ms, + OptionalItemCpsHandler[c_string]( + postprocess_optional_str_none_for_not_found, + complete_fut_ptr, + fut_ptr))) + return asyncio.wrap_future(fut_ptr) def async_internal_kv_multi_get( self, keys: List[bytes], namespace=None, timeout=None @@ -232,13 +231,16 @@ cdef class NewGcsClient: c_string ns = namespace or b"" int64_t timeout_ms = round(1000 * timeout) if timeout else -1 c_vector[c_string] c_keys = [key for key in keys] - fut, cb = make_future_and_callback(postprocess=check_status_or_return) - cdef PyDefaultCallback cy_callback = PyDefaultCallback(cb) + void* fut_ptr = make_fut_ptr() with nogil: check_status_timeout_as_rpc_error( self.inner.get().InternalKV().AsyncInternalKVMultiGet( - ns, c_keys, timeout_ms, cy_callback)) - return fut + ns, c_keys, timeout_ms, + OptionalItemCpsHandler[unordered_map[c_string, c_string]]( + postprocess_optional_multi_get, + complete_fut_ptr, + fut_ptr))) + return asyncio.wrap_future(fut_ptr) def async_internal_kv_put( self, c_string key, c_string value, c_bool overwrite=False, namespace=None, @@ -247,52 +249,65 @@ cdef class NewGcsClient: cdef: c_string ns = namespace or b"" int64_t timeout_ms = round(1000 * timeout) if timeout else -1 - fut, cb = make_future_and_callback(postprocess=check_status_or_return) - cdef PyDefaultCallback cy_callback = PyDefaultCallback(cb) + void* fut_ptr = make_fut_ptr() with nogil: check_status_timeout_as_rpc_error( self.inner.get().InternalKV().AsyncInternalKVPut( - ns, key, value, overwrite, timeout_ms, cy_callback)) - return fut + ns, key, value, overwrite, timeout_ms, + OptionalItemCpsHandler[int]( + postprocess_optional_int, + complete_fut_ptr, + fut_ptr))) + return asyncio.wrap_future(fut_ptr) def async_internal_kv_del(self, c_string key, c_bool del_by_prefix, namespace=None, timeout=None) -> Future[int]: cdef: c_string ns = namespace or b"" int64_t timeout_ms = round(1000 * timeout) if timeout else -1 - fut, cb = make_future_and_callback(postprocess=check_status_or_return) - cdef PyDefaultCallback cy_callback = PyDefaultCallback(cb) + void* fut_ptr = make_fut_ptr() with nogil: check_status_timeout_as_rpc_error( self.inner.get().InternalKV().AsyncInternalKVDel( - ns, key, del_by_prefix, timeout_ms, cy_callback)) - return fut + ns, key, del_by_prefix, timeout_ms, + OptionalItemCpsHandler[int]( + postprocess_optional_int, + complete_fut_ptr, + fut_ptr))) + return asyncio.wrap_future(fut_ptr) def async_internal_kv_keys(self, c_string prefix, namespace=None, timeout=None ) -> Future[List[bytes]]: cdef: c_string ns = namespace or b"" int64_t timeout_ms = round(1000 * timeout) if timeout else -1 - fut, cb = make_future_and_callback(postprocess=check_status_or_return) - cdef PyDefaultCallback cy_callback = PyDefaultCallback(cb) + void* fut_ptr = make_fut_ptr() with nogil: check_status_timeout_as_rpc_error( self.inner.get().InternalKV().AsyncInternalKVKeys( - ns, prefix, timeout_ms, cy_callback)) - return fut + ns, prefix, timeout_ms, + OptionalItemCpsHandler[c_vector[c_string]]( + postprocess_optional_vector_str, + complete_fut_ptr, + fut_ptr))) + return asyncio.wrap_future(fut_ptr) def async_internal_kv_exists(self, c_string key, namespace=None, timeout=None ) -> Future[bool]: cdef: c_string ns = namespace or b"" int64_t timeout_ms = round(1000 * timeout) if timeout else -1 - fut, cb = make_future_and_callback(postprocess=check_status_or_return) - cdef PyDefaultCallback cy_callback = PyDefaultCallback(cb) + void* fut_ptr = make_fut_ptr() with nogil: check_status_timeout_as_rpc_error( self.inner.get().InternalKV().AsyncInternalKVExists( - ns, key, timeout_ms, cy_callback)) - return fut + ns, key, timeout_ms, + OptionalItemCpsHandler[c_bool]( + postprocess_optional_bool, + complete_fut_ptr, + fut_ptr))) + return asyncio.wrap_future(fut_ptr) + ############################################################# # NodeInfo methods @@ -316,14 +331,16 @@ cdef class NewGcsClient: cdef: int64_t timeout_ms = round(1000 * timeout) if timeout else -1 c_vector[c_string] c_node_ips = [ip for ip in node_ips] - fut, cb = make_future_and_callback(postprocess=check_status_or_return) - cdef PyMultiItemCallback[BoolConverter] cy_callback = \ - PyMultiItemCallback[BoolConverter](cb) + void* fut_ptr = make_fut_ptr() with nogil: check_status_timeout_as_rpc_error( self.inner.get().Nodes().AsyncCheckAlive( - c_node_ips, timeout_ms, cy_callback)) - return fut + c_node_ips, timeout_ms, + MultiItemCpsHandler[c_bool]( + &postprocess_multi_bool, + complete_fut_ptr, + fut_ptr))) + return asyncio.wrap_future(fut_ptr) def drain_nodes( self, node_ids: List[bytes], timeout: Optional[float] = None @@ -406,21 +423,13 @@ cdef class NewGcsClient: self, timeout: Optional[float] = None ) -> Future[Dict[str, gcs_pb2.JobTableData]]: cdef int64_t timeout_ms = round(1000 * timeout) if timeout else -1 + cdef void* fut_ptr = make_fut_ptr() - def postprocess(binary): - list_of_bytes: List[bytes] = check_status_or_return(binary) - job_table_data = {} - for b in list_of_bytes: - proto = gcs_pb2.JobTableData() - proto.ParseFromString(b) - job_table_data[proto.job_id] = proto - return job_table_data - fut, cb = make_future_and_callback(postprocess=postprocess) - cdef PyDefaultCallback cy_callback = PyDefaultCallback(cb) with nogil: check_status_timeout_as_rpc_error( - self.inner.get().Jobs().AsyncGetAll(cy_callback, timeout_ms)) - return fut + self.inner.get().Jobs().AsyncGetAll(MultiItemCpsHandler[CJobTableData](postprocess_async_get_all_job_info, complete_fut_ptr, + fut_ptr), timeout_ms)) + return asyncio.wrap_future(fut_ptr) ############################################################# # Runtime Env methods @@ -523,65 +532,102 @@ cdef class NewGcsClient: # Util functions for async handling -# Ideally we want to pass CRayStatus around. However it's not easy to wrap a -# `ray::Status` to a `PythonObject*` so we marshall it to a 3-tuple like this. It can be -# unmarshalled to CRayStatus with `to_c_ray_status`. -StatusParts = Tuple[int, str, int] - -cdef CRayStatus to_c_ray_status(tup: StatusParts): - cdef: - uint8_t code = tup[0] - CStatusCode status_code = (code) - c_string msg = tup[1] - int rpc_code = tup[2] - CRayStatus s - if status_code == CStatusCode_OK: - return CRayStatus.OK() - s = CRayStatus(status_code, msg, rpc_code) - return s - - -def check_status_parts(parts: StatusParts): - check_status_timeout_as_rpc_error(to_c_ray_status(parts)) - - -def check_status_or_return(tup: Tuple[StatusParts, Any]): - status_parts, val = tup - check_status_parts(status_parts) - return val - - -cdef make_future_and_callback(postprocess=None): - """ - Prepares a series of async call and returns (future, callback). - In runtime it's in this order: - - Async API invoked. - - if it returns non-OK, the async call raises. - - Async API invokes `callback`, in the C++ thread - - `callback` sends the result to the event loop thread and fulfill `fut`. - - `run_postprocess` awaits `fut`, invokes `postprocess` and fulfill `fut2`. - - `fut2` is what we return to the user. - - Params: - `postprocess` is a sync function that returns transformed value, may raise. - """ - loop = asyncio.get_event_loop() - fut = loop.create_future() - - def callback(result, exc): - # May run in in C++ thread - if fut.cancelled(): - return - if exc is not None: - loop.call_soon_threadsafe(fut.set_exception, exc) - else: - loop.call_soon_threadsafe(fut.set_result, result) - - async def run_postprocess(fut, postprocess): - result = await fut - if postprocess is None: - return result +cdef void* make_fut_ptr(): + fut = concurrent.futures.Future() + cpython.Py_INCREF(fut) + cdef void* fut_ptr = fut + return fut_ptr + +cdef void complete_fut_ptr(result, void* fut_ptr): + # concurrent.futures.Future + cdef fut = fut_ptr + + assert not fut.done() + try: + ret, exc = result + if exc: + fut.set_exception(exc) else: - return postprocess(result) - - return run_postprocess(fut, postprocess), callback + fut.set_result(ret) + finally: + cpython.Py_DECREF(fut) + +# Returns a Python object, or raises an exception. +cdef postprocess_async_get_all_job_info(CRayStatus status, c_vector[CJobTableData]&& c_data): + # -> Dict[JobID, gcs_pb2.JobTableData] + cdef c_string b + try: + check_status_timeout_as_rpc_error(status) + job_table_data = {} + for c_proto in c_data: + b = c_proto.SerializeAsString() + proto = gcs_pb2.JobTableData() + proto.ParseFromString(b) + job_table_data[proto.job_id] = proto + return job_table_data, None + except Exception as e: + return None, e + +cdef postprocess_optional_str_none_for_not_found(CRayStatus status, const optional[c_string]& c_str): + # -> Optional[bytes] + try: + if status.IsNotFound(): + return None, None + check_status_timeout_as_rpc_error(status) + return dereference(c_str), None + except Exception as e: + return None, e + +cdef postprocess_optional_multi_get(CRayStatus status, const optional[unordered_map[c_string, c_string]]& c_map): + # -> Dict[str, str] + cdef unordered_map[c_string, c_string].const_iterator it + try: + check_status_timeout_as_rpc_error(status) + result = {} + it = dereference(c_map).const_begin() + while it != dereference(c_map).const_end(): + key = dereference(it).first + value = dereference(it).second + result[key] = value + postincrement(it) + return result, None + except Exception as e: + return None, e + +cdef postprocess_optional_int(CRayStatus status, const optional[int]& c_int): + # -> int + try: + check_status_timeout_as_rpc_error(status) + return c_int.value(), None + except Exception as e: + return None, e + +cdef postprocess_optional_vector_str(CRayStatus status, const optional[c_vector[c_string]]& c_vec): + # -> Dict[str, str] + try: + check_status_timeout_as_rpc_error(status) + except Exception as e: + return None, e + cdef const c_vector[c_string]* vec = &c_vec.value() + cdef c_vector[c_string].const_iterator it = dereference(vec).const_begin() + result = [] + while it != dereference(vec).const_end(): + result.append(dereference(it)) + postincrement(it) + # // result = [s for s in c_vec.value()] + return result, None +cdef postprocess_optional_bool(CRayStatus status, const optional[c_bool]& b): + # -> bool + try: + check_status_timeout_as_rpc_error(status) + except Exception as e: + return None, e + return b.value(), None + +cdef postprocess_multi_bool(CRayStatus status, c_vector[c_bool]&& c_data): + # -> List[bool] + try: + check_status_timeout_as_rpc_error(status) + return [b for b in c_data], None + except Exception as e: + return None, e diff --git a/src/ray/gcs/gcs_client/python_callbacks.h b/src/ray/gcs/gcs_client/python_callbacks.h index 4d340511055a..9dbfa0215044 100644 --- a/src/ray/gcs/gcs_client/python_callbacks.h +++ b/src/ray/gcs/gcs_client/python_callbacks.h @@ -16,340 +16,83 @@ #include -#include -#include #include -#include #include -#include "absl/container/flat_hash_map.h" -#include "ray/common/status.h" #include "ray/util/logging.h" namespace ray { +namespace gcs { -//// Converters -// Converts C++ types to PyObject*. -// None of the converters hold the GIL, but all of them requires the GIL to be held. -// This means the caller should hold the GIL before calling these functions. -// -// A Converter is a functor that takes a C++ type and returns a PyObject*. -// The conversion may fail, in which case the converter should set an exception and return -// nullptr. -// -// By default you can use `DefaultConverter::convert` to convert any type. If you need -// special handling you can compose out your own. -class BytesConverter { - // Serializes the message to a string. Returns false if the serialization fails. - // template - // static bool serialize(const T &message, std::string &result); - - // Specialization for types with a SerializeToString method - template - static bool serialize(const Message &message, - std::string &result, - typename std::enable_if::value>::type * = 0) { - return message.SerializeToString(&result); - } - - // Specialization for types with a Binary method, i.e. `BaseID`s - // Never fails. - template - static bool serialize( - const ID &id, - std::string &result, - typename std::enable_if< - std::is_member_function_pointer::value>::type * = 0) { - result = id.Binary(); - return true; - } - - public: - template - static PyObject *convert(const T &arg) { - std::string serialized_message; - if (serialize(arg, serialized_message)) { - return PyBytes_FromStringAndSize(serialized_message.data(), - serialized_message.size()); - } else { - PyErr_SetString(PyExc_ValueError, "Failed to serialize message."); - Py_RETURN_NONE; - } - } - - static PyObject *convert(const std::string &arg) { - return PyBytes_FromStringAndSize(arg.data(), arg.size()); - } -}; - -class BoolConverter { - public: - static PyObject *convert(bool arg) { return PyBool_FromLong(arg); } -}; - -class IntConverter { - public: - static PyObject *convert(int arg) { return PyLong_FromLong(arg); } -}; - -template -class OptionalConverter { - public: - template - static PyObject *convert(const boost::optional &arg) { - if (arg) { - return Inner::convert(*arg); - } else { - Py_RETURN_NONE; - } - } -}; - -template -class VectorConverter { - public: - template - static PyObject *convert(const std::vector &arg) { - PyObject *list = PyList_New(arg.size()); - for (size_t i = 0; i < arg.size(); i++) { - PyObject *item = Inner::convert(arg[i]); - if (item == nullptr) { - // Failed to convert an item. Free the list and all items within. - Py_DECREF(list); - return nullptr; - } - PyList_SetItem(list, i, Inner::convert(arg[i])); - } - return list; - } -}; - -// Returns a Python tuple of two elements. -template -class PairConverter { - public: - template - static PyObject *convert(const T &left, const U &right) { - PyObject *result = PyTuple_New(2); - PyObject *left_obj = Left::convert(left); - if (left_obj == nullptr) { - Py_DECREF(result); - return nullptr; - } - PyTuple_SetItem(result, 0, left_obj); - PyObject *right_obj = Right::convert(right); - if (right_obj == nullptr) { - Py_DECREF(result); - return nullptr; - } - PyTuple_SetItem(result, 1, right_obj); - return result; - } -}; - -template -class MapConverter { - public: - template - static PyObject *convert(const MapType &arg) { - PyObject *dict = PyDict_New(); - for (const auto &pair : arg) { - PyObject *key = KeyConverter::convert(pair.first); - if (key == nullptr) { - Py_DECREF(dict); - return nullptr; - } - PyObject *value = ValueConverter::convert(pair.second); - if (value == nullptr) { - Py_DECREF(key); - Py_DECREF(dict); - return nullptr; - } - if (PyDict_SetItem(dict, key, value) < 0) { - Py_DECREF(key); - Py_DECREF(value); - Py_DECREF(dict); - return nullptr; - } - Py_XDECREF(key); - Py_XDECREF(value); - } - return dict; - } -}; - -// Converts ray::Status to Tuple[StatusCode, error_message, rpc_code] -class StatusConverter { +class PythonGilHolder { public: - static PyObject *convert(const ray::Status &status) { - static_assert(std::is_same_v>, - "StatusCode underlying type should be char."); - return PyTuple_Pack(3, - IntConverter::convert(static_cast(status.code())), - BytesConverter::convert(status.message()), - IntConverter::convert(status.rpc_code())); - } -}; + PythonGilHolder() : state_(PyGILState_Ensure()) {} + ~PythonGilHolder() { PyGILState_Release(state_); } -// Default converter, converts all types implemented above. -// Resolution: -// - single bool, int, status: BoolConverter, IntConverter, StatusConverter -// - optional: OptionalConverter -// - vector: VectorConverter -// - single generic argument: BytesConverter -// - 2 args (T, U): PairConverter -// - map: MapConverter (we can't do generics over MapType for -// collision w/ BytesConverter, so we specialize for std::unordered_map and -// absl::flat_hash_map) -class DefaultConverter { - public: - static PyObject *convert(bool arg) { return BoolConverter::convert(arg); } - static PyObject *convert(int arg) { return IntConverter::convert(arg); } - static PyObject *convert(const Status &arg) { return StatusConverter::convert(arg); } - - template - static PyObject *convert(const T &arg) { - return BytesConverter::convert(arg); - } - template - static PyObject *convert(const boost::optional &arg) { - return OptionalConverter::convert(arg); - } - template - static PyObject *convert(const std::vector &arg) { - return VectorConverter::convert(arg); - } - template - static PyObject *convert(const T &left, const U &right) { - return PairConverter::convert(left, right); - } - template - static PyObject *convert(const std::unordered_map &arg) { - return MapConverter::convert(arg); - } - template - static PyObject *convert(const absl::flat_hash_map &arg) { - return MapConverter::convert(arg); - } + private: + PyGILState_STATE state_; }; -// Wraps a Python `Callable[[T, Exception], None]` into a C++ `std::function`. -// This is a base class for all the callbacks, with subclass handling the conversion. -// The base class handles: -// - GIL management -// - PyObject Ref counting -// - Exception handling. +// Facility for async bindings. C++ APIs need a callback (`std::function`), +// and in the callback we want to do type conversion and complete the Python future. +// However, Cython can't wrap a Python callable to a stateful C++ std::function. // -// `Converter`: a functor that converts U to owned PyObject*. +// Fortunately, Cython can convert *pure* Cython functions to C++ function pointers. +// Hence we make this C++ Functor `CpsHandler` to wrap Python calls to C++ callbacks. // -// TODO: For all these we also need to handle exc. We do this by giving the py_callable -// another arg so it becomes Callable[[T, Exception], None]. The exception is a -// 3-tuple of (type, value, tb). -// TODO: Subscribe need iterator/generator. -template -class PyCallback { - private: - class PythonGilHolder { - public: - PythonGilHolder() : state_(PyGILState_Ensure()) {} - ~PythonGilHolder() { PyGILState_Release(state_); } - - private: - PyGILState_STATE state_; - }; - +// Different APIs have different type signatures, but the code of completing the future +// is the same. So we ask 2 Cython function pointers: `Handler` and `Continuation`. +// `Handler` is unique for each API, converting C++ types to Python types. `Continuation` +// is shared by all APIs, completing the Python future. +// +// One issue is the `Continuation` have to be stateless, so we need to keep the Future +// in the functor. But we don't want to expose the Future to C++ too much, so we keep +// it as a void*. For that, we Py_INCREF the Future in the functor and Py_DECREF it +// after the completion. +// +// On C++ async API calling: +// 1. Create a Future. +// 2. Creates a `CpsHandler` functor with `Handler` and `Continuation` and the Future. +// 3. Invokes the async API with the functor. +// +// On C++ async API completion: +// 1. The CpsHandler functor is called. It acquires GIL and: +// 2. The functor calls the Cython function `Handler` with C++ types. It returns +// `Tuple[result, exception]`. +// 3. The functor calls the Cython function `Continuation` with the tuple and the +// Future (as void*). It completes the Python future with the result or with the +// exception. +template +class CpsHandler { public: - // Ref counted Python object. - // This callback class is copyable, and as a std::function it's indeed copied around. - // But we don't want to hold the GIL and do ref counting so many times, so we use a - // shared_ptr to manage the ref count for us, and only call Py_XINCREF once. - // - // Note this ref counted is only from C++ side. If this ref count goes to 0, we only - // Issue 1 decref, which does not necessarily free the object. - std::shared_ptr py_callable; + // The Handler is a Cython function that takes Args... and returns a PyObject*. + // It must not raise exceptions. + // The return PyObject* is passed to the Continuation. + using Handler = PyObject *(*)(Args...); + // It must not raise exceptions. + using Continuation = void (*)(PyObject *, void *); - // Needed by Cython to place a placeholder object. - PyCallback() {} + CpsHandler(Handler handler, Continuation continuation, void *context) + : handler(handler), continuation(continuation), context(context) {} - PyCallback(PyObject *callable) { - if (callable == nullptr) { - py_callable = nullptr; - return; - } + void operator()(Args &&...args) { PythonGilHolder gil; - Py_XINCREF(callable); - - py_callable = std::shared_ptr(callable, [](PyObject *obj) { - PythonGilHolder gil; - Py_XDECREF(obj); - }); + PyObject *result = handler(std::forward(args)...); + continuation(result, context); } - // Typically Args is just a single argument, but we allow multiple arguments for e.g. - // (Status, boost::optional). - // - // Converts the arguments to a Python Object and may raise exceptions. - // Invokes the Python callable with (converted_arg, None), or (None, exception). - // - // The callback should not return anything and should not raise exceptions. If it - // raises, we catch it and log it. - template - void operator()(const Args &...args) { - if (py_callable == nullptr) { - return; - } - - // Hold the GIL. - PythonGilHolder gil; - PyObject *arg_obj = Converter::convert(args...); - if (arg_obj == nullptr) { - // Failed to convert the arguments. The exception is set by the converter. - PyObject *exc_obj = PyErr_Occurred(); - if (exc_obj == nullptr) { - // The converter didn't set an exception? - RAY_LOG(WARNING) << "Failed to convert arguments, but no exception set."; - PyErr_SetString(PyExc_ValueError, "Failed to convert arguments."); - exc_obj = PyErr_Occurred(); - } - PyObject *result = - PyObject_CallFunctionObjArgs(py_callable.get(), Py_None, exc_obj, NULL); - Py_XDECREF(result); - } else { - PyObject *result = - PyObject_CallFunctionObjArgs(py_callable.get(), arg_obj, Py_None, NULL); - Py_XDECREF(arg_obj); - Py_XDECREF(result); - } - if (PyErr_Occurred()) { - // Our binding code raised exceptions. Not much we can do here. Print it out. - PyObject *ptype, *pvalue, *ptraceback; - PyErr_Fetch(&ptype, &pvalue, &ptraceback); - PyErr_NormalizeException(&ptype, &pvalue, &ptraceback); - PyObject *str_exc = PyObject_Str(pvalue); - const char *exc_str = PyUnicode_AsUTF8(str_exc); - - RAY_LOG(ERROR) << "Python exception in cpython binding callback: " << exc_str; + private: + Handler handler = nullptr; + Continuation continuation = nullptr; + void *context = nullptr; +}; - // Clean up - Py_XDECREF(ptype); - Py_XDECREF(pvalue); - Py_XDECREF(ptraceback); - Py_XDECREF(str_exc); +template +using MultiItemCpsHandler = CpsHandler &&>; - PyErr_Clear(); - } - } -}; +template +using OptionalItemCpsHandler = CpsHandler &>; -// Concrete callback types. -// Most types are using the DefaultConverter, but we allow specialization for some types. -using PyDefaultCallback = PyCallback; -// Specialization for a pair of (Status, Vector). -// We need this because `MultiItemCallback` uses (Status, std::vector&&) not const ref. -// and C++ can't deduce it well (will implicit convert the vector&& to bool) -template -using PyMultiItemCallback = - PyCallback>>; +} // namespace gcs } // namespace ray From 9b71a70ca1f7cad5976817803cb7dfcca9e3c309 Mon Sep 17 00:00:00 2001 From: Ruiyang Wang Date: Thu, 25 Jul 2024 00:32:00 -0700 Subject: [PATCH 03/10] better comments Signed-off-by: Ruiyang Wang --- BUILD.bazel | 11 +++++------ python/ray/includes/common.pxd | 1 - python/ray/includes/gcs_client.pxi | 27 ++------------------------- 3 files changed, 7 insertions(+), 32 deletions(-) diff --git a/BUILD.bazel b/BUILD.bazel index b6a17084f459..b1693cea36f5 100644 --- a/BUILD.bazel +++ b/BUILD.bazel @@ -7,14 +7,14 @@ # If you would like to help with the move in your PR, please use `git mv` so that the history of the file is retained. load("@bazel_skylib//rules:common_settings.bzl", "bool_flag") -load("@com_github_google_flatbuffers//:build_defs.bzl", "flatbuffer_cc_library") -load("@com_github_grpc_grpc//bazel:cc_grpc_library.bzl", "cc_grpc_library") -load("@com_github_grpc_grpc//bazel:cython_library.bzl", "pyx_library") -load("@python3_9//:defs.bzl", python39 = "interpreter") -load("@rules_cc//cc:defs.bzl", "cc_proto_library") load("@rules_proto//proto:defs.bzl", "proto_library") load("@rules_python//python:defs.bzl", "py_library", "py_runtime", "py_runtime_pair") +load("@rules_cc//cc:defs.bzl", "cc_proto_library") +load("@com_github_grpc_grpc//bazel:cc_grpc_library.bzl", "cc_grpc_library") +load("@com_github_grpc_grpc//bazel:cython_library.bzl", "pyx_library") +load("@com_github_google_flatbuffers//:build_defs.bzl", "flatbuffer_cc_library") load("//bazel:ray.bzl", "COPTS", "PYX_COPTS", "PYX_SRCS", "copy_to_workspace", "ray_cc_binary", "ray_cc_library", "ray_cc_test") +load("@python3_9//:defs.bzl", python39 = "interpreter") package( default_visibility = ["//visibility:public"], @@ -2364,7 +2364,6 @@ flatbuffer_cc_library( ray_cc_library( name = "python_callbacks", hdrs = [ - "src/ray/gcs/callback.h", "src/ray/gcs/gcs_client/python_callbacks.h", ], ) diff --git a/python/ray/includes/common.pxd b/python/ray/includes/common.pxd index 6fecfc10e68b..5fab0280b73a 100644 --- a/python/ray/includes/common.pxd +++ b/python/ray/includes/common.pxd @@ -6,7 +6,6 @@ from libc.stdint cimport uint8_t, int32_t, uint64_t, int64_t, uint32_t from libcpp.unordered_map cimport unordered_map from libcpp.vector cimport vector as c_vector from libcpp.pair cimport pair as c_pair -from libcpp.functional cimport function from ray.includes.optional cimport ( optional, ) diff --git a/python/ray/includes/gcs_client.pxi b/python/ray/includes/gcs_client.pxi index 42c03d65008c..b973b8be512e 100644 --- a/python/ray/includes/gcs_client.pxi +++ b/python/ray/includes/gcs_client.pxi @@ -14,32 +14,9 @@ Binding of C++ ray::gcs::GcsClient. # # We need to best-effort import everything we need. # -# Implementation Notes: -# -# Async API -# -# One challenge is that the C++ async API is callback-based, and the callbacks are -# invoked in the C++ threads. In `make_future_and_callback` we create a future and a -# callback, and the callback will fulfill the future in the event loop thread. The -# future is returned to Python to await, and the callback is passed to the C++ async -# API. Once C++ async API invokes the callback, the future is fulfilled in the Python -# event loop thread. -# -# Marshalling -# -# The C++ API returns ints, strings, `ray::Status` and C++ protobuf types. We need to -# convert them to Python types. In `python_callbacks.h` we define a series of converters -# with Cpython APIs: -# -# - bools, ints and strings are converted using `PyBool_FromLong` and alike. -# - `ray::Status` is marshalled to a 3-tuple and unmarshall it back to `CRayStatus` via -# `to_c_ray_status`. -# - C++ protobuf types are serialized them to bytes, passed to Python and deserialized -# in the Python `postprocess` functions. Later if we need performance for specific -# methods we can add a custom Converter in `python_callbacks.h`. - +# For how async API are implemented, see src/ray/gcs/gcs_client/python_callbacks.h from asyncio import Future -from typing import List, Dict, Any, Tuple, Optional, Callable +from typing import List from libcpp.utility cimport move import concurrent.futures from ray.includes.common cimport ( From d4ccfdc539c95f68de08bed6ff3a364c039da189 Mon Sep 17 00:00:00 2001 From: Ruiyang Wang Date: Thu, 25 Jul 2024 16:35:38 -0700 Subject: [PATCH 04/10] update naming and how fut_ptr works Signed-off-by: Ruiyang Wang --- python/ray/includes/common.pxd | 56 ++++---------- python/ray/includes/gcs_client.pxi | 92 ++++++++++++----------- src/ray/gcs/gcs_client/python_callbacks.h | 58 ++++++++------ 3 files changed, 97 insertions(+), 109 deletions(-) diff --git a/python/ray/includes/common.pxd b/python/ray/includes/common.pxd index 5fab0280b73a..452a3400d401 100644 --- a/python/ray/includes/common.pxd +++ b/python/ray/includes/common.pxd @@ -368,41 +368,11 @@ cdef extern from "ray/core_worker/common.h" nogil: const c_bool GetDidSpill() const cdef extern from "ray/gcs/gcs_client/python_callbacks.h" namespace "ray::gcs": - # Facility for async bindings. C++ APIs need a callback (`std::function`), - # and in the callback we want to do type conversion and complete the Python future. - # However, Cython can't wrap a Python callable to a stateful C++ std::function. - - # Fortunately, Cython can convert *pure* Cython functions to C++ function pointers. - # Hence we make this C++ Functor `CpsHandler` to wrap Python calls to C++ callbacks. - - # Different APIs have different type signatures, but the code of completing the future - # is the same. So we ask 2 Cython function pointers: `Handler` and `Continuation`. - # `Handler` is unique for each API, converting C++ types to Python types. `Continuation` - # is shared by all APIs, completing the Python future. - - # One issue is the `Continuation` have to be stateless, so we need to keep the Future - # in the functor. But we don't want to expose the Future to C++ too much, so we keep - # it as a void*. For that, we Py_INCREF the Future in the functor and Py_DECREF it - # after the completion. - - # On C++ async API calling: - # 1. Create a Future. - # 2. Creates a `CpsHandler` functor with `Handler` and `Continuation` and the Future. - # 3. Invokes the async API with the functor. - - # On C++ async API completion: - # 1. The CpsHandler functor is called. It acquires GIL and: - # 2. The functor calls the Cython function `Handler` with C++ types. It returns - # # `Tuple[result, exception]`. - # 3. The functor calls the Cython function `Continuation` with the tuple and the - # # Future (as void*). It completes the Python future with the result or with the - # # exception. - - cdef cppclass MultiItemCpsHandler[T]: - MultiItemCpsHandler(object (*)(CRayStatus, c_vector[T] &&) , void (object, void*), void*) nogil - - cdef cppclass OptionalItemCpsHandler[T]: - OptionalItemCpsHandler(object (*)(CRayStatus, const optional[T]&) , void (object, void*), void*) nogil + cdef cppclass MultiItemPyCallback[T]: + MultiItemPyCallback(object (*)(CRayStatus, c_vector[T] &&) , void (object, void*), void*) nogil + + cdef cppclass OptionalItemPyCallback[T]: + OptionalItemPyCallback(object (*)(CRayStatus, const optional[T]&) , void (object, void*), void*) nogil cdef extern from "ray/gcs/gcs_client/accessor.h" nogil: cdef cppclass CActorInfoAccessor "ray::gcs::ActorInfoAccessor": @@ -414,7 +384,7 @@ cdef extern from "ray/gcs/gcs_client/accessor.h" nogil: int64_t timeout_ms) CRayStatus AsyncGetAll( - const MultiItemCpsHandler[CJobTableData] &callback, + const MultiItemPyCallback[CJobTableData] &callback, int64_t timeout_ms) cdef cppclass CNodeInfoAccessor "ray::gcs::NodeInfoAccessor": @@ -426,7 +396,7 @@ cdef extern from "ray/gcs/gcs_client/accessor.h" nogil: CRayStatus AsyncCheckAlive( const c_vector[c_string] &raylet_addresses, int64_t timeout_ms, - const MultiItemCpsHandler[c_bool] &callback) + const MultiItemPyCallback[c_bool] &callback) CRayStatus DrainNodes( const c_vector[CNodeID] &node_ids, @@ -486,19 +456,19 @@ cdef extern from "ray/gcs/gcs_client/accessor.h" nogil: const c_string &ns, const c_string &prefix, int64_t timeout_ms, - const OptionalItemCpsHandler[c_vector[c_string]] &callback) + const OptionalItemPyCallback[c_vector[c_string]] &callback) CRayStatus AsyncInternalKVGet( const c_string &ns, const c_string &key, int64_t timeout_ms, - const OptionalItemCpsHandler[c_string] &callback) + const OptionalItemPyCallback[c_string] &callback) CRayStatus AsyncInternalKVMultiGet( const c_string &ns, const c_vector[c_string] &keys, int64_t timeout_ms, - const OptionalItemCpsHandler[unordered_map[c_string, c_string]] &callback) + const OptionalItemPyCallback[unordered_map[c_string, c_string]] &callback) CRayStatus AsyncInternalKVPut( const c_string &ns, @@ -506,20 +476,20 @@ cdef extern from "ray/gcs/gcs_client/accessor.h" nogil: const c_string &value, c_bool overwrite, int64_t timeout_ms, - const OptionalItemCpsHandler[int] &callback) + const OptionalItemPyCallback[int] &callback) CRayStatus AsyncInternalKVExists( const c_string &ns, const c_string &key, int64_t timeout_ms, - const OptionalItemCpsHandler[c_bool] &callback) + const OptionalItemPyCallback[c_bool] &callback) CRayStatus AsyncInternalKVDel( const c_string &ns, const c_string &key, c_bool del_by_prefix, int64_t timeout_ms, - const OptionalItemCpsHandler[int] &callback) + const OptionalItemPyCallback[int] &callback) cdef cppclass CRuntimeEnvAccessor "ray::gcs::RuntimeEnvAccessor": CRayStatus PinRuntimeEnvUri( diff --git a/python/ray/includes/gcs_client.pxi b/python/ray/includes/gcs_client.pxi index b973b8be512e..f28f3e75f571 100644 --- a/python/ray/includes/gcs_client.pxi +++ b/python/ray/includes/gcs_client.pxi @@ -25,8 +25,8 @@ from ray.includes.common cimport ( ConnectOnSingletonIoContext, CStatusCode, CStatusCode_OK, - MultiItemCpsHandler, - OptionalItemCpsHandler, + MultiItemPyCallback, + OptionalItemPyCallback, ) from ray.includes.optional cimport optional from ray.core.generated import gcs_pb2 @@ -190,16 +190,17 @@ cdef class NewGcsClient: cdef: c_string ns = namespace or b"" int64_t timeout_ms = round(1000 * timeout) if timeout else -1 - void* fut_ptr = make_fut_ptr() + fut = incremented_fut() + void* fut_ptr = fut with nogil: check_status_timeout_as_rpc_error( self.inner.get().InternalKV().AsyncInternalKVGet( ns, key, timeout_ms, - OptionalItemCpsHandler[c_string]( + OptionalItemPyCallback[c_string]( postprocess_optional_str_none_for_not_found, - complete_fut_ptr, + assign_and_decrement, fut_ptr))) - return asyncio.wrap_future(fut_ptr) + return asyncio.wrap_future(fut) def async_internal_kv_multi_get( self, keys: List[bytes], namespace=None, timeout=None @@ -208,16 +209,17 @@ cdef class NewGcsClient: c_string ns = namespace or b"" int64_t timeout_ms = round(1000 * timeout) if timeout else -1 c_vector[c_string] c_keys = [key for key in keys] - void* fut_ptr = make_fut_ptr() + fut = incremented_fut() + void* fut_ptr = fut with nogil: check_status_timeout_as_rpc_error( self.inner.get().InternalKV().AsyncInternalKVMultiGet( ns, c_keys, timeout_ms, - OptionalItemCpsHandler[unordered_map[c_string, c_string]]( + OptionalItemPyCallback[unordered_map[c_string, c_string]]( postprocess_optional_multi_get, - complete_fut_ptr, + assign_and_decrement, fut_ptr))) - return asyncio.wrap_future(fut_ptr) + return asyncio.wrap_future(fut) def async_internal_kv_put( self, c_string key, c_string value, c_bool overwrite=False, namespace=None, @@ -226,64 +228,68 @@ cdef class NewGcsClient: cdef: c_string ns = namespace or b"" int64_t timeout_ms = round(1000 * timeout) if timeout else -1 - void* fut_ptr = make_fut_ptr() + fut = incremented_fut() + void* fut_ptr = fut with nogil: check_status_timeout_as_rpc_error( self.inner.get().InternalKV().AsyncInternalKVPut( ns, key, value, overwrite, timeout_ms, - OptionalItemCpsHandler[int]( + OptionalItemPyCallback[int]( postprocess_optional_int, - complete_fut_ptr, + assign_and_decrement, fut_ptr))) - return asyncio.wrap_future(fut_ptr) + return asyncio.wrap_future(fut) def async_internal_kv_del(self, c_string key, c_bool del_by_prefix, namespace=None, timeout=None) -> Future[int]: cdef: c_string ns = namespace or b"" int64_t timeout_ms = round(1000 * timeout) if timeout else -1 - void* fut_ptr = make_fut_ptr() + fut = incremented_fut() + void* fut_ptr = fut with nogil: check_status_timeout_as_rpc_error( self.inner.get().InternalKV().AsyncInternalKVDel( ns, key, del_by_prefix, timeout_ms, - OptionalItemCpsHandler[int]( + OptionalItemPyCallback[int]( postprocess_optional_int, - complete_fut_ptr, + assign_and_decrement, fut_ptr))) - return asyncio.wrap_future(fut_ptr) + return asyncio.wrap_future(fut) def async_internal_kv_keys(self, c_string prefix, namespace=None, timeout=None ) -> Future[List[bytes]]: cdef: c_string ns = namespace or b"" int64_t timeout_ms = round(1000 * timeout) if timeout else -1 - void* fut_ptr = make_fut_ptr() + fut = incremented_fut() + void* fut_ptr = fut with nogil: check_status_timeout_as_rpc_error( self.inner.get().InternalKV().AsyncInternalKVKeys( ns, prefix, timeout_ms, - OptionalItemCpsHandler[c_vector[c_string]]( + OptionalItemPyCallback[c_vector[c_string]]( postprocess_optional_vector_str, - complete_fut_ptr, + assign_and_decrement, fut_ptr))) - return asyncio.wrap_future(fut_ptr) + return asyncio.wrap_future(fut) def async_internal_kv_exists(self, c_string key, namespace=None, timeout=None ) -> Future[bool]: cdef: c_string ns = namespace or b"" int64_t timeout_ms = round(1000 * timeout) if timeout else -1 - void* fut_ptr = make_fut_ptr() + fut = incremented_fut() + void* fut_ptr = fut with nogil: check_status_timeout_as_rpc_error( self.inner.get().InternalKV().AsyncInternalKVExists( ns, key, timeout_ms, - OptionalItemCpsHandler[c_bool]( + OptionalItemPyCallback[c_bool]( postprocess_optional_bool, - complete_fut_ptr, + assign_and_decrement, fut_ptr))) - return asyncio.wrap_future(fut_ptr) + return asyncio.wrap_future(fut) ############################################################# @@ -308,16 +314,17 @@ cdef class NewGcsClient: cdef: int64_t timeout_ms = round(1000 * timeout) if timeout else -1 c_vector[c_string] c_node_ips = [ip for ip in node_ips] - void* fut_ptr = make_fut_ptr() + fut = incremented_fut() + void* fut_ptr = fut with nogil: check_status_timeout_as_rpc_error( self.inner.get().Nodes().AsyncCheckAlive( c_node_ips, timeout_ms, - MultiItemCpsHandler[c_bool]( + MultiItemPyCallback[c_bool]( &postprocess_multi_bool, - complete_fut_ptr, + assign_and_decrement, fut_ptr))) - return asyncio.wrap_future(fut_ptr) + return asyncio.wrap_future(fut) def drain_nodes( self, node_ids: List[bytes], timeout: Optional[float] = None @@ -399,14 +406,16 @@ cdef class NewGcsClient: def async_get_all_job_info( self, timeout: Optional[float] = None ) -> Future[Dict[str, gcs_pb2.JobTableData]]: - cdef int64_t timeout_ms = round(1000 * timeout) if timeout else -1 - cdef void* fut_ptr = make_fut_ptr() + cdef: + int64_t timeout_ms = round(1000 * timeout) if timeout else -1 + fut = incremented_fut() + void* fut_ptr = fut with nogil: check_status_timeout_as_rpc_error( - self.inner.get().Jobs().AsyncGetAll(MultiItemCpsHandler[CJobTableData](postprocess_async_get_all_job_info, complete_fut_ptr, + self.inner.get().Jobs().AsyncGetAll(MultiItemPyCallback[CJobTableData](postprocess_async_get_all_job_info, assign_and_decrement, fut_ptr), timeout_ms)) - return asyncio.wrap_future(fut_ptr) + return asyncio.wrap_future(fut) ############################################################# # Runtime Env methods @@ -509,15 +518,14 @@ cdef class NewGcsClient: # Util functions for async handling -cdef void* make_fut_ptr(): +cdef incremented_fut(): fut = concurrent.futures.Future() cpython.Py_INCREF(fut) - cdef void* fut_ptr = fut - return fut_ptr + return fut -cdef void complete_fut_ptr(result, void* fut_ptr): - # concurrent.futures.Future +cdef void assign_and_decrement(result, void* fut_ptr): cdef fut = fut_ptr + assert isinstance(fut, concurrent.futures.Future) assert not fut.done() try: @@ -551,7 +559,7 @@ cdef postprocess_optional_str_none_for_not_found(CRayStatus status, const option if status.IsNotFound(): return None, None check_status_timeout_as_rpc_error(status) - return dereference(c_str), None + return c_str.value(), None except Exception as e: return None, e @@ -561,8 +569,8 @@ cdef postprocess_optional_multi_get(CRayStatus status, const optional[unordered_ try: check_status_timeout_as_rpc_error(status) result = {} - it = dereference(c_map).const_begin() - while it != dereference(c_map).const_end(): + it = c_map.value().const_begin() + while it != c_map.value().const_end(): key = dereference(it).first value = dereference(it).second result[key] = value diff --git a/src/ray/gcs/gcs_client/python_callbacks.h b/src/ray/gcs/gcs_client/python_callbacks.h index 9dbfa0215044..b85591e98e3e 100644 --- a/src/ray/gcs/gcs_client/python_callbacks.h +++ b/src/ray/gcs/gcs_client/python_callbacks.h @@ -38,60 +38,70 @@ class PythonGilHolder { // However, Cython can't wrap a Python callable to a stateful C++ std::function. // // Fortunately, Cython can convert *pure* Cython functions to C++ function pointers. -// Hence we make this C++ Functor `CpsHandler` to wrap Python calls to C++ callbacks. +// Hence we make this C++ Functor `PyCallback` to wrap Python calls to C++ callbacks. // // Different APIs have different type signatures, but the code of completing the future -// is the same. So we ask 2 Cython function pointers: `Handler` and `Continuation`. -// `Handler` is unique for each API, converting C++ types to Python types. `Continuation` -// is shared by all APIs, completing the Python future. +// is the same. So we ask 2 Cython function pointers: `Converter` and `Assigner`. +// `Converter` is unique for each API, converting C++ types to Python types. +// `Assigner` is shared by all APIs, completing the Python future. // -// One issue is the `Continuation` have to be stateless, so we need to keep the Future +// One issue is the `Assigner` have to be stateless, so we need to keep the Future // in the functor. But we don't want to expose the Future to C++ too much, so we keep // it as a void*. For that, we Py_INCREF the Future in the functor and Py_DECREF it // after the completion. // // On C++ async API calling: // 1. Create a Future. -// 2. Creates a `CpsHandler` functor with `Handler` and `Continuation` and the Future. +// 2. Creates a `PyCallback` functor with `Converter` and `Assigner` and the Future. // 3. Invokes the async API with the functor. // // On C++ async API completion: -// 1. The CpsHandler functor is called. It acquires GIL and: -// 2. The functor calls the Cython function `Handler` with C++ types. It returns +// 1. The PyCallback functor is called. It acquires GIL and: +// 2. The functor calls the Cython function `Converter` with C++ types. It returns // `Tuple[result, exception]`. -// 3. The functor calls the Cython function `Continuation` with the tuple and the -// Future (as void*). It completes the Python future with the result or with the -// exception. +// 3. The functor calls the Cython function `Assigner` with the tuple and the +// Future (as void*). It assign the result or the exception to the Python future. template -class CpsHandler { +class PyCallback { public: - // The Handler is a Cython function that takes Args... and returns a PyObject*. + // The Converter is a Cython function that takes Args... and returns a PyObject*. // It must not raise exceptions. - // The return PyObject* is passed to the Continuation. - using Handler = PyObject *(*)(Args...); + // The return PyObject* is passed to the Assigner. + using Converter = PyObject *(*)(Args...); // It must not raise exceptions. - using Continuation = void (*)(PyObject *, void *); + using Assigner = void (*)(PyObject *, void *); - CpsHandler(Handler handler, Continuation continuation, void *context) - : handler(handler), continuation(continuation), context(context) {} + PyCallback(Converter converter, Assigner assigner, void *context) + : converter(converter), assigner(assigner), context(context) {} void operator()(Args &&...args) { PythonGilHolder gil; - PyObject *result = handler(std::forward(args)...); - continuation(result, context); + PyObject *result = converter(std::forward(args)...); + CheckNoException(); + + assigner(result, context); + CheckNoException(); + } + + void CheckNoException() { + if (PyErr_Occurred() != nullptr) { + PyErr_Print(); + PyErr_Clear(); + RAY_LOG(FATAL) << "Python exception occurred in async binding code, exiting!"; + } } private: - Handler handler = nullptr; - Continuation continuation = nullptr; + Converter converter = nullptr; + Assigner assigner = nullptr; void *context = nullptr; }; template -using MultiItemCpsHandler = CpsHandler &&>; +using MultiItemPyCallback = PyCallback &&>; template -using OptionalItemCpsHandler = CpsHandler &>; +using OptionalItemPyCallback = PyCallback &>; } // namespace gcs From 7d630fd2b7eb5e8ee783d1ed9d0fa409ad40a39e Mon Sep 17 00:00:00 2001 From: Ruiyang Wang Date: Thu, 25 Jul 2024 16:39:12 -0700 Subject: [PATCH 05/10] lint Signed-off-by: Ruiyang Wang --- python/ray/includes/common.pxd | 8 ++++++-- python/ray/includes/gcs_client.pxi | 21 ++++++++++++++------- 2 files changed, 20 insertions(+), 9 deletions(-) diff --git a/python/ray/includes/common.pxd b/python/ray/includes/common.pxd index 452a3400d401..73a9a8c447df 100644 --- a/python/ray/includes/common.pxd +++ b/python/ray/includes/common.pxd @@ -369,10 +369,14 @@ cdef extern from "ray/core_worker/common.h" nogil: cdef extern from "ray/gcs/gcs_client/python_callbacks.h" namespace "ray::gcs": cdef cppclass MultiItemPyCallback[T]: - MultiItemPyCallback(object (*)(CRayStatus, c_vector[T] &&) , void (object, void*), void*) nogil + MultiItemPyCallback( + object (*)(CRayStatus, c_vector[T] &&), + void (object, void*), void*) nogil cdef cppclass OptionalItemPyCallback[T]: - OptionalItemPyCallback(object (*)(CRayStatus, const optional[T]&) , void (object, void*), void*) nogil + OptionalItemPyCallback( + object (*)(CRayStatus, const optional[T]&), + void (object, void*), void*) nogil cdef extern from "ray/gcs/gcs_client/accessor.h" nogil: cdef cppclass CActorInfoAccessor "ray::gcs::ActorInfoAccessor": diff --git a/python/ray/includes/gcs_client.pxi b/python/ray/includes/gcs_client.pxi index f28f3e75f571..aafc2ffdd09b 100644 --- a/python/ray/includes/gcs_client.pxi +++ b/python/ray/includes/gcs_client.pxi @@ -291,7 +291,6 @@ cdef class NewGcsClient: fut_ptr))) return asyncio.wrap_future(fut) - ############################################################# # NodeInfo methods ############################################################# @@ -413,8 +412,12 @@ cdef class NewGcsClient: with nogil: check_status_timeout_as_rpc_error( - self.inner.get().Jobs().AsyncGetAll(MultiItemPyCallback[CJobTableData](postprocess_async_get_all_job_info, assign_and_decrement, - fut_ptr), timeout_ms)) + self.inner.get().Jobs().AsyncGetAll( + MultiItemPyCallback[CJobTableData]( + postprocess_async_get_all_job_info, + assign_and_decrement, + fut_ptr), + timeout_ms)) return asyncio.wrap_future(fut) ############################################################# @@ -538,7 +541,8 @@ cdef void assign_and_decrement(result, void* fut_ptr): cpython.Py_DECREF(fut) # Returns a Python object, or raises an exception. -cdef postprocess_async_get_all_job_info(CRayStatus status, c_vector[CJobTableData]&& c_data): +cdef postprocess_async_get_all_job_info( + CRayStatus status, c_vector[CJobTableData]&& c_data): # -> Dict[JobID, gcs_pb2.JobTableData] cdef c_string b try: @@ -553,7 +557,8 @@ cdef postprocess_async_get_all_job_info(CRayStatus status, c_vector[CJobTableDat except Exception as e: return None, e -cdef postprocess_optional_str_none_for_not_found(CRayStatus status, const optional[c_string]& c_str): +cdef postprocess_optional_str_none_for_not_found( + CRayStatus status, const optional[c_string]& c_str): # -> Optional[bytes] try: if status.IsNotFound(): @@ -563,7 +568,8 @@ cdef postprocess_optional_str_none_for_not_found(CRayStatus status, const option except Exception as e: return None, e -cdef postprocess_optional_multi_get(CRayStatus status, const optional[unordered_map[c_string, c_string]]& c_map): +cdef postprocess_optional_multi_get( + CRayStatus status, const optional[unordered_map[c_string, c_string]]& c_map): # -> Dict[str, str] cdef unordered_map[c_string, c_string].const_iterator it try: @@ -587,7 +593,8 @@ cdef postprocess_optional_int(CRayStatus status, const optional[int]& c_int): except Exception as e: return None, e -cdef postprocess_optional_vector_str(CRayStatus status, const optional[c_vector[c_string]]& c_vec): +cdef postprocess_optional_vector_str( + CRayStatus status, const optional[c_vector[c_string]]& c_vec): # -> Dict[str, str] try: check_status_timeout_as_rpc_error(status) From 40459c1ddfaef8c2482a80c25331e829cf687b9f Mon Sep 17 00:00:00 2001 From: Ruiyang Wang Date: Fri, 26 Jul 2024 14:52:04 -0700 Subject: [PATCH 06/10] naming Signed-off-by: Ruiyang Wang --- python/ray/includes/gcs_client.pxi | 83 +++++++++++++++++------------- 1 file changed, 46 insertions(+), 37 deletions(-) diff --git a/python/ray/includes/gcs_client.pxi b/python/ray/includes/gcs_client.pxi index aafc2ffdd09b..14000ca6c57f 100644 --- a/python/ray/includes/gcs_client.pxi +++ b/python/ray/includes/gcs_client.pxi @@ -197,8 +197,8 @@ cdef class NewGcsClient: self.inner.get().InternalKV().AsyncInternalKVGet( ns, key, timeout_ms, OptionalItemPyCallback[c_string]( - postprocess_optional_str_none_for_not_found, - assign_and_decrement, + convert_optional_str_none_for_not_found, + assign_and_decrement_fut, fut_ptr))) return asyncio.wrap_future(fut) @@ -216,8 +216,8 @@ cdef class NewGcsClient: self.inner.get().InternalKV().AsyncInternalKVMultiGet( ns, c_keys, timeout_ms, OptionalItemPyCallback[unordered_map[c_string, c_string]]( - postprocess_optional_multi_get, - assign_and_decrement, + convert_optional_multi_get, + assign_and_decrement_fut, fut_ptr))) return asyncio.wrap_future(fut) @@ -235,8 +235,8 @@ cdef class NewGcsClient: self.inner.get().InternalKV().AsyncInternalKVPut( ns, key, value, overwrite, timeout_ms, OptionalItemPyCallback[int]( - postprocess_optional_int, - assign_and_decrement, + convert_optional_int, + assign_and_decrement_fut, fut_ptr))) return asyncio.wrap_future(fut) @@ -252,8 +252,8 @@ cdef class NewGcsClient: self.inner.get().InternalKV().AsyncInternalKVDel( ns, key, del_by_prefix, timeout_ms, OptionalItemPyCallback[int]( - postprocess_optional_int, - assign_and_decrement, + convert_optional_int, + assign_and_decrement_fut, fut_ptr))) return asyncio.wrap_future(fut) @@ -269,8 +269,8 @@ cdef class NewGcsClient: self.inner.get().InternalKV().AsyncInternalKVKeys( ns, prefix, timeout_ms, OptionalItemPyCallback[c_vector[c_string]]( - postprocess_optional_vector_str, - assign_and_decrement, + convert_optional_vector_str, + assign_and_decrement_fut, fut_ptr))) return asyncio.wrap_future(fut) @@ -286,8 +286,8 @@ cdef class NewGcsClient: self.inner.get().InternalKV().AsyncInternalKVExists( ns, key, timeout_ms, OptionalItemPyCallback[c_bool]( - postprocess_optional_bool, - assign_and_decrement, + convert_optional_bool, + assign_and_decrement_fut, fut_ptr))) return asyncio.wrap_future(fut) @@ -320,8 +320,8 @@ cdef class NewGcsClient: self.inner.get().Nodes().AsyncCheckAlive( c_node_ips, timeout_ms, MultiItemPyCallback[c_bool]( - &postprocess_multi_bool, - assign_and_decrement, + &convert_multi_bool, + assign_and_decrement_fut, fut_ptr))) return asyncio.wrap_future(fut) @@ -387,20 +387,12 @@ cdef class NewGcsClient: self, timeout: Optional[float] = None ) -> Dict[JobID, gcs_pb2.JobTableData]: cdef int64_t timeout_ms = round(1000 * timeout) if timeout else -1 + cdef CRayStatus status cdef c_vector[CJobTableData] reply cdef c_vector[c_string] serialized_reply with nogil: - check_status_timeout_as_rpc_error( - self.inner.get().Jobs().GetAll(reply, timeout_ms) - ) - for i in range(reply.size()): - serialized_reply.push_back(reply[i].SerializeAsString()) - ret = {} - for serialized in serialized_reply: - proto = gcs_pb2.JobTableData() - proto.ParseFromString(serialized) - ret[JobID.from_binary(proto.job_id)] = proto - return ret + status = self.inner.get().Jobs().GetAll(reply, timeout_ms) + return raise_or_return((convert_get_all_job_info(status, move(reply)))) def async_get_all_job_info( self, timeout: Optional[float] = None @@ -414,8 +406,8 @@ cdef class NewGcsClient: check_status_timeout_as_rpc_error( self.inner.get().Jobs().AsyncGetAll( MultiItemPyCallback[CJobTableData]( - postprocess_async_get_all_job_info, - assign_and_decrement, + convert_get_all_job_info, + assign_and_decrement_fut, fut_ptr), timeout_ms)) return asyncio.wrap_future(fut) @@ -526,7 +518,7 @@ cdef incremented_fut(): cpython.Py_INCREF(fut) return fut -cdef void assign_and_decrement(result, void* fut_ptr): +cdef void assign_and_decrement_fut(result, void* fut_ptr): cdef fut = fut_ptr assert isinstance(fut, concurrent.futures.Future) @@ -540,8 +532,23 @@ cdef void assign_and_decrement(result, void* fut_ptr): finally: cpython.Py_DECREF(fut) -# Returns a Python object, or raises an exception. -cdef postprocess_async_get_all_job_info( +cdef raise_or_return(tup): + ret, exc = tup + if exc: + raise exc + return ret + +############################################################# +# Converter functions: C++ types -> Python types, use by both Sync and Async APIs. +# They have to be defined here as pure functions because a function pointer is passed +# to C++ for Async APIs. +# +# Each function accepts what the C++ callback passes, typically a Status and a value. +# Returns `Tuple[object, Optional[Exception]]` (we are all gophers now lol). +# Must not raise exceptions, or it crashes the process. +############################################################# + +cdef convert_get_all_job_info( CRayStatus status, c_vector[CJobTableData]&& c_data): # -> Dict[JobID, gcs_pb2.JobTableData] cdef c_string b @@ -557,8 +564,11 @@ cdef postprocess_async_get_all_job_info( except Exception as e: return None, e -cdef postprocess_optional_str_none_for_not_found( +cdef convert_optional_str_none_for_not_found( CRayStatus status, const optional[c_string]& c_str): + # If status is NotFound, return None. + # If status is OK, return the value. + # Else, raise exception. # -> Optional[bytes] try: if status.IsNotFound(): @@ -568,7 +578,7 @@ cdef postprocess_optional_str_none_for_not_found( except Exception as e: return None, e -cdef postprocess_optional_multi_get( +cdef convert_optional_multi_get( CRayStatus status, const optional[unordered_map[c_string, c_string]]& c_map): # -> Dict[str, str] cdef unordered_map[c_string, c_string].const_iterator it @@ -585,7 +595,7 @@ cdef postprocess_optional_multi_get( except Exception as e: return None, e -cdef postprocess_optional_int(CRayStatus status, const optional[int]& c_int): +cdef convert_optional_int(CRayStatus status, const optional[int]& c_int): # -> int try: check_status_timeout_as_rpc_error(status) @@ -593,7 +603,7 @@ cdef postprocess_optional_int(CRayStatus status, const optional[int]& c_int): except Exception as e: return None, e -cdef postprocess_optional_vector_str( +cdef convert_optional_vector_str( CRayStatus status, const optional[c_vector[c_string]]& c_vec): # -> Dict[str, str] try: @@ -606,9 +616,8 @@ cdef postprocess_optional_vector_str( while it != dereference(vec).const_end(): result.append(dereference(it)) postincrement(it) - # // result = [s for s in c_vec.value()] return result, None -cdef postprocess_optional_bool(CRayStatus status, const optional[c_bool]& b): +cdef convert_optional_bool(CRayStatus status, const optional[c_bool]& b): # -> bool try: check_status_timeout_as_rpc_error(status) @@ -616,7 +625,7 @@ cdef postprocess_optional_bool(CRayStatus status, const optional[c_bool]& b): return None, e return b.value(), None -cdef postprocess_multi_bool(CRayStatus status, c_vector[c_bool]&& c_data): +cdef convert_multi_bool(CRayStatus status, c_vector[c_bool]&& c_data): # -> List[bool] try: check_status_timeout_as_rpc_error(status) From b878fe007d58735bc4b5d229f47ebf6c5bd65d8d Mon Sep 17 00:00:00 2001 From: Ruiyang Wang Date: Fri, 26 Jul 2024 15:47:10 -0700 Subject: [PATCH 07/10] merge sync and async converter code Signed-off-by: Ruiyang Wang --- python/ray/includes/gcs_client.pxi | 124 ++++++++++------------ src/ray/gcs/gcs_client/accessor.h | 1 + src/ray/gcs/gcs_client/python_callbacks.h | 17 +-- 3 files changed, 69 insertions(+), 73 deletions(-) diff --git a/python/ray/includes/gcs_client.pxi b/python/ray/includes/gcs_client.pxi index 14000ca6c57f..17358d1496f4 100644 --- a/python/ray/includes/gcs_client.pxi +++ b/python/ray/includes/gcs_client.pxi @@ -88,15 +88,11 @@ cdef class NewGcsClient: cdef: c_string ns = namespace or b"" int64_t timeout_ms = round(1000 * timeout) if timeout else -1 - c_string value + optional[c_string] opt_value = c_string() CRayStatus status with nogil: - status = self.inner.get().InternalKV().Get(ns, key, timeout_ms, value) - if status.IsNotFound(): - return None - else: - check_status_timeout_as_rpc_error(status) - return value + status = self.inner.get().InternalKV().Get(ns, key, timeout_ms, opt_value.value()) + return raise_or_return(convert_optional_str_none_for_not_found(status, opt_value)) def internal_kv_multi_get( self, keys: List[bytes], namespace=None, timeout=None @@ -105,20 +101,11 @@ cdef class NewGcsClient: c_string ns = namespace or b"" int64_t timeout_ms = round(1000 * timeout) if timeout else -1 c_vector[c_string] c_keys = [key for key in keys] - unordered_map[c_string, c_string] values + optional[unordered_map[c_string, c_string]] opt_values = unordered_map[c_string, c_string]() + CRayStatus status with nogil: - check_status_timeout_as_rpc_error( - self.inner.get().InternalKV().MultiGet(ns, c_keys, timeout_ms, values) - ) - - result = {} - it = values.begin() - while it != values.end(): - key = dereference(it).first - value = dereference(it).second - result[key] = value - postincrement(it) - return result + status = self.inner.get().InternalKV().MultiGet(ns, c_keys, timeout_ms, opt_values.value()) + return raise_or_return(convert_optional_multi_get(status, opt_values)) def internal_kv_put(self, c_string key, c_string value, c_bool overwrite=False, namespace=None, timeout=None) -> int: @@ -128,13 +115,11 @@ cdef class NewGcsClient: cdef: c_string ns = namespace or b"" int64_t timeout_ms = round(1000 * timeout) if timeout else -1 - c_bool added = False + optional[c_bool] opt_added = 0 + CRayStatus status with nogil: - check_status_timeout_as_rpc_error( - self.inner.get() - .InternalKV() - .Put(ns, key, value, overwrite, timeout_ms, added) - ) + status = self.inner.get().InternalKV().Put(ns, key, value, overwrite, timeout_ms, opt_added.value()) + added = raise_or_return(convert_optional_bool(status, opt_added)) return 1 if added else 0 def internal_kv_del(self, c_string key, c_bool del_by_prefix, @@ -145,14 +130,11 @@ cdef class NewGcsClient: cdef: c_string ns = namespace or b"" int64_t timeout_ms = round(1000 * timeout) if timeout else -1 - int num_deleted = 0 + optional[int] opt_num_deleted = 0 + CRayStatus status with nogil: - check_status_timeout_as_rpc_error( - self.inner.get() - .InternalKV() - .Del(ns, key, del_by_prefix, timeout_ms, num_deleted) - ) - return num_deleted + status = self.inner.get().InternalKV().Del(ns, key, del_by_prefix, timeout_ms, opt_num_deleted.value()) + return raise_or_return(convert_optional_int(status, opt_num_deleted)) def internal_kv_keys( self, c_string prefix, namespace=None, timeout=None @@ -160,25 +142,21 @@ cdef class NewGcsClient: cdef: c_string ns = namespace or b"" int64_t timeout_ms = round(1000 * timeout) if timeout else -1 - c_vector[c_string] keys + optional[c_vector[c_string]] opt_keys = c_vector[c_string]() + CRayStatus status with nogil: - check_status_timeout_as_rpc_error( - self.inner.get().InternalKV().Keys(ns, prefix, timeout_ms, keys) - ) - - result = [key for key in keys] - return result + status = self.inner.get().InternalKV().Keys(ns, prefix, timeout_ms, opt_keys.value()) + return raise_or_return(convert_optional_vector_str(status, opt_keys)) def internal_kv_exists(self, c_string key, namespace=None, timeout=None) -> bool: cdef: c_string ns = namespace or b"" int64_t timeout_ms = round(1000 * timeout) if timeout else -1 - c_bool exists = False + optional[c_bool] opt_exists = 0 + CRayStatus status with nogil: - check_status_timeout_as_rpc_error( - self.inner.get().InternalKV().Exists(ns, key, timeout_ms, exists) - ) - return exists + status = self.inner.get().InternalKV().Exists(ns, key, timeout_ms, opt_exists.value()) + return raise_or_return(convert_optional_bool(status, opt_exists)) ############################################################# # Internal KV async methods @@ -225,6 +203,8 @@ cdef class NewGcsClient: self, c_string key, c_string value, c_bool overwrite=False, namespace=None, timeout=None ) -> Future[int]: + # TODO(ryw): the sync `internal_kv_put` returns bool while this async version + # returns int. We should make them consistent. cdef: c_string ns = namespace or b"" int64_t timeout_ms = round(1000 * timeout) if timeout else -1 @@ -301,11 +281,10 @@ cdef class NewGcsClient: int64_t timeout_ms = round(1000 * timeout) if timeout else -1 c_vector[c_string] c_node_ips = [ip for ip in node_ips] c_vector[c_bool] results + CRayStatus status with nogil: - check_status_timeout_as_rpc_error( - self.inner.get().Nodes().CheckAlive(c_node_ips, timeout_ms, results) - ) - return [result for result in results] + status = self.inner.get().Nodes().CheckAlive(c_node_ips, timeout_ms, results) + return raise_or_return(convert_multi_bool(status, move(results))) def async_check_alive( self, node_ips: List[bytes], timeout: Optional[float] = None @@ -333,32 +312,22 @@ cdef class NewGcsClient: int64_t timeout_ms = round(1000 * timeout) if timeout else -1 c_vector[CNodeID] c_node_ids c_vector[c_string] results + CRayStatus status for node_id in node_ids: c_node_ids.push_back(CNodeID.FromBinary(node_id)) with nogil: - check_status_timeout_as_rpc_error( - self.inner.get().Nodes().DrainNodes(c_node_ids, timeout_ms, results) - ) - return [result for result in results] + status = self.inner.get().Nodes().DrainNodes(c_node_ids, timeout_ms, results) + return raise_or_return(convert_multi_str(status, move(results))) def get_all_node_info( self, timeout: Optional[float] = None ) -> Dict[NodeID, gcs_pb2.GcsNodeInfo]: cdef int64_t timeout_ms = round(1000 * timeout) if timeout else -1 cdef c_vector[CGcsNodeInfo] reply - cdef c_vector[c_string] serialized_reply + cdef CRayStatus status with nogil: - check_status_timeout_as_rpc_error( - self.inner.get().Nodes().GetAllNoCache(timeout_ms, reply) - ) - for node in reply: - serialized_reply.push_back(node.SerializeAsString()) - ret = {} - for serialized in serialized_reply: - proto = gcs_pb2.GcsNodeInfo() - proto.ParseFromString(serialized) - ret[NodeID.from_binary(proto.node_id)] = proto - return ret + status = self.inner.get().Nodes().GetAllNoCache(timeout_ms, reply) + return raise_or_return(convert_get_all_node_info(status, move(reply))) ############################################################# # NodeResources methods @@ -548,6 +517,22 @@ cdef raise_or_return(tup): # Must not raise exceptions, or it crashes the process. ############################################################# +cdef convert_get_all_node_info( + CRayStatus status, c_vector[CGcsNodeInfo]&& c_data): + # -> Dict[JobID, gcs_pb2.JobTableData] + cdef c_string b + try: + check_status_timeout_as_rpc_error(status) + node_table_data = {} + for c_proto in c_data: + b = c_proto.SerializeAsString() + proto = gcs_pb2.GcsNodeInfo() + proto.ParseFromString(b) + node_table_data[proto.node_id] = proto + return node_table_data, None + except Exception as e: + return None, e + cdef convert_get_all_job_info( CRayStatus status, c_vector[CJobTableData]&& c_data): # -> Dict[JobID, gcs_pb2.JobTableData] @@ -617,6 +602,7 @@ cdef convert_optional_vector_str( result.append(dereference(it)) postincrement(it) return result, None + cdef convert_optional_bool(CRayStatus status, const optional[c_bool]& b): # -> bool try: @@ -632,3 +618,11 @@ cdef convert_multi_bool(CRayStatus status, c_vector[c_bool]&& c_data): return [b for b in c_data], None except Exception as e: return None, e + +cdef convert_multi_str(CRayStatus status, c_vector[c_string]&& c_data): + # -> List[bytes] + try: + check_status_timeout_as_rpc_error(status) + return [datum for datum in c_data], None + except Exception as e: + return None, e diff --git a/src/ray/gcs/gcs_client/accessor.h b/src/ray/gcs/gcs_client/accessor.h index e546790d596b..f9e299315c53 100644 --- a/src/ray/gcs/gcs_client/accessor.h +++ b/src/ray/gcs/gcs_client/accessor.h @@ -853,6 +853,7 @@ class InternalKVAccessor { /// \param added It's an output parameter. It'll be set to be true if /// any row is added. /// \return Status + /// TODO(ryw): change the out parameter type to `int` just like AsyncInternalKVPut. virtual Status Put(const std::string &ns, const std::string &key, const std::string &value, diff --git a/src/ray/gcs/gcs_client/python_callbacks.h b/src/ray/gcs/gcs_client/python_callbacks.h index b85591e98e3e..baabda221efc 100644 --- a/src/ray/gcs/gcs_client/python_callbacks.h +++ b/src/ray/gcs/gcs_client/python_callbacks.h @@ -35,21 +35,22 @@ class PythonGilHolder { // Facility for async bindings. C++ APIs need a callback (`std::function`), // and in the callback we want to do type conversion and complete the Python future. -// However, Cython can't wrap a Python callable to a stateful C++ std::function. // -// Fortunately, Cython can convert *pure* Cython functions to C++ function pointers. -// Hence we make this C++ Functor `PyCallback` to wrap Python calls to C++ callbacks. +// An ideal API would be a `std::function` that wraps a Python callable, which +// holds references to a Python `Future`. However, Cython can't wrap a Python callable +// into a stateful C++ std::function. Instead we have to define Cython `cdef` functions +// who are translated to C++ functions, and use their function pointers. +// +// Because we can only work with stateless Cython functions, we need to keep the Future +// as a void* in this functor. This functor does not manage its lifetime: it assumes the +// void* is always valid. We Py_INCREF the Future in `incremented_fut` before passing it +// to PyCallback, and Py_DECREF it in `assign_and_decrement_fut` after the completion. // // Different APIs have different type signatures, but the code of completing the future // is the same. So we ask 2 Cython function pointers: `Converter` and `Assigner`. // `Converter` is unique for each API, converting C++ types to Python types. // `Assigner` is shared by all APIs, completing the Python future. // -// One issue is the `Assigner` have to be stateless, so we need to keep the Future -// in the functor. But we don't want to expose the Future to C++ too much, so we keep -// it as a void*. For that, we Py_INCREF the Future in the functor and Py_DECREF it -// after the completion. -// // On C++ async API calling: // 1. Create a Future. // 2. Creates a `PyCallback` functor with `Converter` and `Assigner` and the Future. From a4d968a60872ffcb67a36d306291aa91d55d518e Mon Sep 17 00:00:00 2001 From: Ruiyang Wang Date: Fri, 26 Jul 2024 15:53:22 -0700 Subject: [PATCH 08/10] lint Signed-off-by: Ruiyang Wang --- python/ray/includes/gcs_client.pxi | 30 ++++++++++++++++++++---------- 1 file changed, 20 insertions(+), 10 deletions(-) diff --git a/python/ray/includes/gcs_client.pxi b/python/ray/includes/gcs_client.pxi index 17358d1496f4..92dd1b0abc0d 100644 --- a/python/ray/includes/gcs_client.pxi +++ b/python/ray/includes/gcs_client.pxi @@ -91,8 +91,10 @@ cdef class NewGcsClient: optional[c_string] opt_value = c_string() CRayStatus status with nogil: - status = self.inner.get().InternalKV().Get(ns, key, timeout_ms, opt_value.value()) - return raise_or_return(convert_optional_str_none_for_not_found(status, opt_value)) + status = self.inner.get().InternalKV().Get( + ns, key, timeout_ms, opt_value.value()) + return raise_or_return( + convert_optional_str_none_for_not_found(status, opt_value)) def internal_kv_multi_get( self, keys: List[bytes], namespace=None, timeout=None @@ -101,10 +103,12 @@ cdef class NewGcsClient: c_string ns = namespace or b"" int64_t timeout_ms = round(1000 * timeout) if timeout else -1 c_vector[c_string] c_keys = [key for key in keys] - optional[unordered_map[c_string, c_string]] opt_values = unordered_map[c_string, c_string]() + optional[unordered_map[c_string, c_string]] opt_values = \ + unordered_map[c_string, c_string]() CRayStatus status with nogil: - status = self.inner.get().InternalKV().MultiGet(ns, c_keys, timeout_ms, opt_values.value()) + status = self.inner.get().InternalKV().MultiGet( + ns, c_keys, timeout_ms, opt_values.value()) return raise_or_return(convert_optional_multi_get(status, opt_values)) def internal_kv_put(self, c_string key, c_string value, c_bool overwrite=False, @@ -118,7 +122,8 @@ cdef class NewGcsClient: optional[c_bool] opt_added = 0 CRayStatus status with nogil: - status = self.inner.get().InternalKV().Put(ns, key, value, overwrite, timeout_ms, opt_added.value()) + status = self.inner.get().InternalKV().Put( + ns, key, value, overwrite, timeout_ms, opt_added.value()) added = raise_or_return(convert_optional_bool(status, opt_added)) return 1 if added else 0 @@ -133,7 +138,8 @@ cdef class NewGcsClient: optional[int] opt_num_deleted = 0 CRayStatus status with nogil: - status = self.inner.get().InternalKV().Del(ns, key, del_by_prefix, timeout_ms, opt_num_deleted.value()) + status = self.inner.get().InternalKV().Del( + ns, key, del_by_prefix, timeout_ms, opt_num_deleted.value()) return raise_or_return(convert_optional_int(status, opt_num_deleted)) def internal_kv_keys( @@ -145,7 +151,8 @@ cdef class NewGcsClient: optional[c_vector[c_string]] opt_keys = c_vector[c_string]() CRayStatus status with nogil: - status = self.inner.get().InternalKV().Keys(ns, prefix, timeout_ms, opt_keys.value()) + status = self.inner.get().InternalKV().Keys( + ns, prefix, timeout_ms, opt_keys.value()) return raise_or_return(convert_optional_vector_str(status, opt_keys)) def internal_kv_exists(self, c_string key, namespace=None, timeout=None) -> bool: @@ -155,7 +162,8 @@ cdef class NewGcsClient: optional[c_bool] opt_exists = 0 CRayStatus status with nogil: - status = self.inner.get().InternalKV().Exists(ns, key, timeout_ms, opt_exists.value()) + status = self.inner.get().InternalKV().Exists( + ns, key, timeout_ms, opt_exists.value()) return raise_or_return(convert_optional_bool(status, opt_exists)) ############################################################# @@ -283,7 +291,8 @@ cdef class NewGcsClient: c_vector[c_bool] results CRayStatus status with nogil: - status = self.inner.get().Nodes().CheckAlive(c_node_ips, timeout_ms, results) + status = self.inner.get().Nodes().CheckAlive( + c_node_ips, timeout_ms, results) return raise_or_return(convert_multi_bool(status, move(results))) def async_check_alive( @@ -316,7 +325,8 @@ cdef class NewGcsClient: for node_id in node_ids: c_node_ids.push_back(CNodeID.FromBinary(node_id)) with nogil: - status = self.inner.get().Nodes().DrainNodes(c_node_ids, timeout_ms, results) + status = self.inner.get().Nodes().DrainNodes( + c_node_ids, timeout_ms, results) return raise_or_return(convert_multi_str(status, move(results))) def get_all_node_info( From f666756fb16bf482adf237f78dc12b1e734fedeb Mon Sep 17 00:00:00 2001 From: Ruiyang Wang Date: Fri, 26 Jul 2024 17:05:02 -0700 Subject: [PATCH 09/10] add assert Signed-off-by: Ruiyang Wang --- python/ray/includes/gcs_client.pxi | 26 ++++++++++++++++++-------- 1 file changed, 18 insertions(+), 8 deletions(-) diff --git a/python/ray/includes/gcs_client.pxi b/python/ray/includes/gcs_client.pxi index 92dd1b0abc0d..794680c910fd 100644 --- a/python/ray/includes/gcs_client.pxi +++ b/python/ray/includes/gcs_client.pxi @@ -569,6 +569,7 @@ cdef convert_optional_str_none_for_not_found( if status.IsNotFound(): return None, None check_status_timeout_as_rpc_error(status) + assert c_str.has_value() return c_str.value(), None except Exception as e: return None, e @@ -579,6 +580,8 @@ cdef convert_optional_multi_get( cdef unordered_map[c_string, c_string].const_iterator it try: check_status_timeout_as_rpc_error(status) + assert c_map.has_value() + result = {} it = c_map.value().const_begin() while it != c_map.value().const_end(): @@ -594,6 +597,7 @@ cdef convert_optional_int(CRayStatus status, const optional[int]& c_int): # -> int try: check_status_timeout_as_rpc_error(status) + assert c_int.has_value() return c_int.value(), None except Exception as e: return None, e @@ -601,25 +605,31 @@ cdef convert_optional_int(CRayStatus status, const optional[int]& c_int): cdef convert_optional_vector_str( CRayStatus status, const optional[c_vector[c_string]]& c_vec): # -> Dict[str, str] + cdef const c_vector[c_string]* vec + cdef c_vector[c_string].const_iterator it try: check_status_timeout_as_rpc_error(status) + + assert c_vec.has_value() + vec = &c_vec.value() + it = vec.const_begin() + result = [] + while it != dereference(vec).const_end(): + result.append(dereference(it)) + postincrement(it) + return result, None except Exception as e: return None, e - cdef const c_vector[c_string]* vec = &c_vec.value() - cdef c_vector[c_string].const_iterator it = dereference(vec).const_begin() - result = [] - while it != dereference(vec).const_end(): - result.append(dereference(it)) - postincrement(it) - return result, None + cdef convert_optional_bool(CRayStatus status, const optional[c_bool]& b): # -> bool try: check_status_timeout_as_rpc_error(status) + assert b.has_value() + return b.value(), None except Exception as e: return None, e - return b.value(), None cdef convert_multi_bool(CRayStatus status, c_vector[c_bool]&& c_data): # -> List[bool] From b9ed108c4b972eb6c42a1c1c27e6e8b51612c6c2 Mon Sep 17 00:00:00 2001 From: Ruiyang Wang Date: Fri, 26 Jul 2024 22:23:24 -0700 Subject: [PATCH 10/10] fix type Signed-off-by: Ruiyang Wang --- python/ray/includes/gcs_client.pxi | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/python/ray/includes/gcs_client.pxi b/python/ray/includes/gcs_client.pxi index 794680c910fd..e366e535b633 100644 --- a/python/ray/includes/gcs_client.pxi +++ b/python/ray/includes/gcs_client.pxi @@ -529,7 +529,7 @@ cdef raise_or_return(tup): cdef convert_get_all_node_info( CRayStatus status, c_vector[CGcsNodeInfo]&& c_data): - # -> Dict[JobID, gcs_pb2.JobTableData] + # -> Dict[NodeID, gcs_pb2.GcsNodeInfo] cdef c_string b try: check_status_timeout_as_rpc_error(status) @@ -538,7 +538,7 @@ cdef convert_get_all_node_info( b = c_proto.SerializeAsString() proto = gcs_pb2.GcsNodeInfo() proto.ParseFromString(b) - node_table_data[proto.node_id] = proto + node_table_data[NodeID.from_binary(proto.node_id)] = proto return node_table_data, None except Exception as e: return None, e @@ -554,7 +554,7 @@ cdef convert_get_all_job_info( b = c_proto.SerializeAsString() proto = gcs_pb2.JobTableData() proto.ParseFromString(b) - job_table_data[proto.job_id] = proto + job_table_data[JobID.from_binary(proto.job_id)] = proto return job_table_data, None except Exception as e: return None, e