diff --git a/BUILD.bazel b/BUILD.bazel index a1988fec965a..b1693cea36f5 100644 --- a/BUILD.bazel +++ b/BUILD.bazel @@ -2361,6 +2361,13 @@ flatbuffer_cc_library( out_prefix = "ray/raylet/format/", ) +ray_cc_library( + name = "python_callbacks", + hdrs = [ + "src/ray/gcs/gcs_client/python_callbacks.h", + ], +) + pyx_library( name = "_raylet", srcs = glob([ @@ -2389,6 +2396,7 @@ pyx_library( linkstatic = 1, ), deps = [ + ":python_callbacks", "//:core_worker_lib", "//:exported_internal", "//:gcs_server_lib", diff --git a/python/ray/_private/gcs_aio_client.py b/python/ray/_private/gcs_aio_client.py index 0469e2ea1f46..6afd57ea93b1 100644 --- a/python/ray/_private/gcs_aio_client.py +++ b/python/ray/_private/gcs_aio_client.py @@ -1,7 +1,8 @@ +import os import logging from typing import Dict, List, Optional from concurrent.futures import ThreadPoolExecutor -from ray._raylet import GcsClient, JobID +from ray._raylet import GcsClient, NewGcsClient, JobID from ray.core.generated import ( gcs_pb2, ) @@ -14,14 +15,63 @@ # If the arg `executor` in GcsAioClient constructor is set, use it. # Otherwise if env var `GCS_AIO_CLIENT_DEFAULT_THREAD_COUNT` is set, use it. # Otherwise, use 5. +# This is only used for the OldGcsAioClient. GCS_AIO_CLIENT_DEFAULT_THREAD_COUNT = env_integer( "GCS_AIO_CLIENT_DEFAULT_THREAD_COUNT", 5 ) - logger = logging.getLogger(__name__) +class GcsAioClient: + """ + Async GCS client. + + This class is in transition to use the new C++ GcsClient binding. The old + PythonGcsClient binding is not deleted until we are confident that the new + binding is stable. + + Defaults to the new binding. If you want to use the old binding, please + set the environment variable `RAY_USE_OLD_GCS_CLIENT=1`. + """ + + def __new__(cls, *args, **kwargs): + use_old_client = os.getenv("RAY_USE_OLD_GCS_CLIENT") == "1" + logger.debug(f"Using {'old' if use_old_client else 'new'} GCS client") + if use_old_client: + return OldGcsAioClient(*args, **kwargs) + else: + return NewGcsAioClient(*args, **kwargs) + + +class NewGcsAioClient: + def __init__( + self, + address: str = None, + loop=None, + executor=None, + nums_reconnect_retry: int = 5, + ): + # See https://github.com/ray-project/ray/blob/d0b46eff9ddcf9ec7256dd3a6dda33e7fb7ced95/python/ray/_raylet.pyx#L2693 # noqa: E501 + timeout_ms = 1000 * (nums_reconnect_retry + 1) + self.inner = NewGcsClient.standalone( + str(address), cluster_id=None, timeout_ms=timeout_ms + ) + # Forwarded Methods. Not using __getattr__ because we want one fewer layer of + # indirection. + self.internal_kv_get = self.inner.async_internal_kv_get + self.internal_kv_multi_get = self.inner.async_internal_kv_multi_get + self.internal_kv_put = self.inner.async_internal_kv_put + self.internal_kv_del = self.inner.async_internal_kv_del + self.internal_kv_exists = self.inner.async_internal_kv_exists + self.internal_kv_keys = self.inner.async_internal_kv_keys + self.check_alive = self.inner.async_check_alive + self.get_all_job_info = self.inner.async_get_all_job_info + # Forwarded Properties. + self.address = self.inner.address + self.cluster_id = self.inner.cluster_id + + class AsyncProxy: def __init__(self, inner, loop, executor): self.inner = inner @@ -45,7 +95,7 @@ def __getattr__(self, name): return attr -class GcsAioClient: +class OldGcsAioClient: def __init__( self, loop=None, diff --git a/python/ray/includes/common.pxd b/python/ray/includes/common.pxd index 602d741a23b0..73a9a8c447df 100644 --- a/python/ray/includes/common.pxd +++ b/python/ray/includes/common.pxd @@ -44,13 +44,16 @@ cdef extern from * namespace "polyfill" nogil: cdef extern from "ray/common/status.h" namespace "ray" nogil: - cdef cppclass StatusCode: + # TODO(ryw) in Cython 3.x we can directly use `cdef enum class CStatusCode` + cdef cppclass CStatusCode "ray::StatusCode": pass + cdef CStatusCode CStatusCode_OK "ray::StatusCode::OK" + c_bool operator==(CStatusCode lhs, CStatusCode rhs) cdef cppclass CRayStatus "ray::Status": CRayStatus() - CRayStatus(StatusCode code, const c_string &msg) - CRayStatus(StatusCode code, const c_string &msg, int rpc_code) + CRayStatus(CStatusCode code, const c_string &msg) + CRayStatus(CStatusCode code, const c_string &msg, int rpc_code) CRayStatus(const CRayStatus &s) @staticmethod @@ -135,7 +138,7 @@ cdef extern from "ray/common/status.h" namespace "ray" nogil: c_string ToString() c_string CodeAsString() - StatusCode code() + CStatusCode code() c_string message() int rpc_code() @@ -145,18 +148,6 @@ cdef extern from "ray/common/status.h" namespace "ray" nogil: cdef CRayStatus RayStatus_NotImplemented "Status::NotImplemented"() -cdef extern from "ray/common/status.h" namespace "ray::StatusCode" nogil: - cdef StatusCode StatusCode_OK "OK" - cdef StatusCode StatusCode_OutOfMemory "OutOfMemory" - cdef StatusCode StatusCode_KeyError "KeyError" - cdef StatusCode StatusCode_TypeError "TypeError" - cdef StatusCode StatusCode_Invalid "Invalid" - cdef StatusCode StatusCode_IOError "IOError" - cdef StatusCode StatusCode_UnknownError "UnknownError" - cdef StatusCode StatusCode_NotImplemented "NotImplemented" - cdef StatusCode StatusCode_RedisError "RedisError" - - cdef extern from "ray/common/id.h" namespace "ray" nogil: const CTaskID GenerateTaskId(const CJobID &job_id, const CTaskID &parent_task_id, @@ -376,6 +367,17 @@ cdef extern from "ray/core_worker/common.h" nogil: const CNodeID &GetSpilledNodeID() const const c_bool GetDidSpill() const +cdef extern from "ray/gcs/gcs_client/python_callbacks.h" namespace "ray::gcs": + cdef cppclass MultiItemPyCallback[T]: + MultiItemPyCallback( + object (*)(CRayStatus, c_vector[T] &&), + void (object, void*), void*) nogil + + cdef cppclass OptionalItemPyCallback[T]: + OptionalItemPyCallback( + object (*)(CRayStatus, const optional[T]&), + void (object, void*), void*) nogil + cdef extern from "ray/gcs/gcs_client/accessor.h" nogil: cdef cppclass CActorInfoAccessor "ray::gcs::ActorInfoAccessor": pass @@ -385,12 +387,21 @@ cdef extern from "ray/gcs/gcs_client/accessor.h" nogil: c_vector[CJobTableData] &result, int64_t timeout_ms) + CRayStatus AsyncGetAll( + const MultiItemPyCallback[CJobTableData] &callback, + int64_t timeout_ms) + cdef cppclass CNodeInfoAccessor "ray::gcs::NodeInfoAccessor": CRayStatus CheckAlive( const c_vector[c_string] &raylet_addresses, int64_t timeout_ms, c_vector[c_bool] &result) + CRayStatus AsyncCheckAlive( + const c_vector[c_string] &raylet_addresses, + int64_t timeout_ms, + const MultiItemPyCallback[c_bool] &callback) + CRayStatus DrainNodes( const c_vector[CNodeID] &node_ids, int64_t timeout_ms, @@ -445,6 +456,45 @@ cdef extern from "ray/gcs/gcs_client/accessor.h" nogil: int64_t timeout_ms, c_bool &exists) + CRayStatus AsyncInternalKVKeys( + const c_string &ns, + const c_string &prefix, + int64_t timeout_ms, + const OptionalItemPyCallback[c_vector[c_string]] &callback) + + CRayStatus AsyncInternalKVGet( + const c_string &ns, + const c_string &key, + int64_t timeout_ms, + const OptionalItemPyCallback[c_string] &callback) + + CRayStatus AsyncInternalKVMultiGet( + const c_string &ns, + const c_vector[c_string] &keys, + int64_t timeout_ms, + const OptionalItemPyCallback[unordered_map[c_string, c_string]] &callback) + + CRayStatus AsyncInternalKVPut( + const c_string &ns, + const c_string &key, + const c_string &value, + c_bool overwrite, + int64_t timeout_ms, + const OptionalItemPyCallback[int] &callback) + + CRayStatus AsyncInternalKVExists( + const c_string &ns, + const c_string &key, + int64_t timeout_ms, + const OptionalItemPyCallback[c_bool] &callback) + + CRayStatus AsyncInternalKVDel( + const c_string &ns, + const c_string &key, + c_bool del_by_prefix, + int64_t timeout_ms, + const OptionalItemPyCallback[int] &callback) + cdef cppclass CRuntimeEnvAccessor "ray::gcs::RuntimeEnvAccessor": CRayStatus PinRuntimeEnvUri( const c_string &uri, diff --git a/python/ray/includes/gcs_client.pxi b/python/ray/includes/gcs_client.pxi index 5552ace199aa..e366e535b633 100644 --- a/python/ray/includes/gcs_client.pxi +++ b/python/ray/includes/gcs_client.pxi @@ -13,16 +13,26 @@ Binding of C++ ray::gcs::GcsClient. # out to a separate translation unit because we need to access the singleton thread. # # We need to best-effort import everything we need. - +# +# For how async API are implemented, see src/ray/gcs/gcs_client/python_callbacks.h from asyncio import Future from typing import List +from libcpp.utility cimport move +import concurrent.futures from ray.includes.common cimport ( CGcsClient, CGetAllResourceUsageReply, ConnectOnSingletonIoContext, + CStatusCode, + CStatusCode_OK, + MultiItemPyCallback, + OptionalItemPyCallback, ) +from ray.includes.optional cimport optional from ray.core.generated import gcs_pb2 from cython.operator import dereference, postincrement +cimport cpython + cdef class NewGcsClient: cdef: @@ -78,15 +88,13 @@ cdef class NewGcsClient: cdef: c_string ns = namespace or b"" int64_t timeout_ms = round(1000 * timeout) if timeout else -1 - c_string value + optional[c_string] opt_value = c_string() CRayStatus status with nogil: - status = self.inner.get().InternalKV().Get(ns, key, timeout_ms, value) - if status.IsNotFound(): - return None - else: - check_status_timeout_as_rpc_error(status) - return value + status = self.inner.get().InternalKV().Get( + ns, key, timeout_ms, opt_value.value()) + return raise_or_return( + convert_optional_str_none_for_not_found(status, opt_value)) def internal_kv_multi_get( self, keys: List[bytes], namespace=None, timeout=None @@ -95,20 +103,13 @@ cdef class NewGcsClient: c_string ns = namespace or b"" int64_t timeout_ms = round(1000 * timeout) if timeout else -1 c_vector[c_string] c_keys = [key for key in keys] - unordered_map[c_string, c_string] values + optional[unordered_map[c_string, c_string]] opt_values = \ + unordered_map[c_string, c_string]() + CRayStatus status with nogil: - check_status_timeout_as_rpc_error( - self.inner.get().InternalKV().MultiGet(ns, c_keys, timeout_ms, values) - ) - - result = {} - it = values.begin() - while it != values.end(): - key = dereference(it).first - value = dereference(it).second - result[key] = value - postincrement(it) - return result + status = self.inner.get().InternalKV().MultiGet( + ns, c_keys, timeout_ms, opt_values.value()) + return raise_or_return(convert_optional_multi_get(status, opt_values)) def internal_kv_put(self, c_string key, c_string value, c_bool overwrite=False, namespace=None, timeout=None) -> int: @@ -118,13 +119,12 @@ cdef class NewGcsClient: cdef: c_string ns = namespace or b"" int64_t timeout_ms = round(1000 * timeout) if timeout else -1 - c_bool added = False + optional[c_bool] opt_added = 0 + CRayStatus status with nogil: - check_status_timeout_as_rpc_error( - self.inner.get() - .InternalKV() - .Put(ns, key, value, overwrite, timeout_ms, added) - ) + status = self.inner.get().InternalKV().Put( + ns, key, value, overwrite, timeout_ms, opt_added.value()) + added = raise_or_return(convert_optional_bool(status, opt_added)) return 1 if added else 0 def internal_kv_del(self, c_string key, c_bool del_by_prefix, @@ -135,14 +135,12 @@ cdef class NewGcsClient: cdef: c_string ns = namespace or b"" int64_t timeout_ms = round(1000 * timeout) if timeout else -1 - int num_deleted = 0 + optional[int] opt_num_deleted = 0 + CRayStatus status with nogil: - check_status_timeout_as_rpc_error( - self.inner.get() - .InternalKV() - .Del(ns, key, del_by_prefix, timeout_ms, num_deleted) - ) - return num_deleted + status = self.inner.get().InternalKV().Del( + ns, key, del_by_prefix, timeout_ms, opt_num_deleted.value()) + return raise_or_return(convert_optional_int(status, opt_num_deleted)) def internal_kv_keys( self, c_string prefix, namespace=None, timeout=None @@ -150,25 +148,136 @@ cdef class NewGcsClient: cdef: c_string ns = namespace or b"" int64_t timeout_ms = round(1000 * timeout) if timeout else -1 - c_vector[c_string] keys + optional[c_vector[c_string]] opt_keys = c_vector[c_string]() + CRayStatus status with nogil: - check_status_timeout_as_rpc_error( - self.inner.get().InternalKV().Keys(ns, prefix, timeout_ms, keys) - ) - - result = [key for key in keys] - return result + status = self.inner.get().InternalKV().Keys( + ns, prefix, timeout_ms, opt_keys.value()) + return raise_or_return(convert_optional_vector_str(status, opt_keys)) def internal_kv_exists(self, c_string key, namespace=None, timeout=None) -> bool: cdef: c_string ns = namespace or b"" int64_t timeout_ms = round(1000 * timeout) if timeout else -1 - c_bool exists = False + optional[c_bool] opt_exists = 0 + CRayStatus status + with nogil: + status = self.inner.get().InternalKV().Exists( + ns, key, timeout_ms, opt_exists.value()) + return raise_or_return(convert_optional_bool(status, opt_exists)) + + ############################################################# + # Internal KV async methods + ############################################################# + + def async_internal_kv_get( + self, c_string key, namespace=None, timeout=None + ) -> Future[Optional[bytes]]: + cdef: + c_string ns = namespace or b"" + int64_t timeout_ms = round(1000 * timeout) if timeout else -1 + fut = incremented_fut() + void* fut_ptr = fut with nogil: check_status_timeout_as_rpc_error( - self.inner.get().InternalKV().Exists(ns, key, timeout_ms, exists) - ) - return exists + self.inner.get().InternalKV().AsyncInternalKVGet( + ns, key, timeout_ms, + OptionalItemPyCallback[c_string]( + convert_optional_str_none_for_not_found, + assign_and_decrement_fut, + fut_ptr))) + return asyncio.wrap_future(fut) + + def async_internal_kv_multi_get( + self, keys: List[bytes], namespace=None, timeout=None + ) -> Future[Dict[bytes, bytes]]: + cdef: + c_string ns = namespace or b"" + int64_t timeout_ms = round(1000 * timeout) if timeout else -1 + c_vector[c_string] c_keys = [key for key in keys] + fut = incremented_fut() + void* fut_ptr = fut + with nogil: + check_status_timeout_as_rpc_error( + self.inner.get().InternalKV().AsyncInternalKVMultiGet( + ns, c_keys, timeout_ms, + OptionalItemPyCallback[unordered_map[c_string, c_string]]( + convert_optional_multi_get, + assign_and_decrement_fut, + fut_ptr))) + return asyncio.wrap_future(fut) + + def async_internal_kv_put( + self, c_string key, c_string value, c_bool overwrite=False, namespace=None, + timeout=None + ) -> Future[int]: + # TODO(ryw): the sync `internal_kv_put` returns bool while this async version + # returns int. We should make them consistent. + cdef: + c_string ns = namespace or b"" + int64_t timeout_ms = round(1000 * timeout) if timeout else -1 + fut = incremented_fut() + void* fut_ptr = fut + with nogil: + check_status_timeout_as_rpc_error( + self.inner.get().InternalKV().AsyncInternalKVPut( + ns, key, value, overwrite, timeout_ms, + OptionalItemPyCallback[int]( + convert_optional_int, + assign_and_decrement_fut, + fut_ptr))) + return asyncio.wrap_future(fut) + + def async_internal_kv_del(self, c_string key, c_bool del_by_prefix, + namespace=None, timeout=None) -> Future[int]: + cdef: + c_string ns = namespace or b"" + int64_t timeout_ms = round(1000 * timeout) if timeout else -1 + fut = incremented_fut() + void* fut_ptr = fut + with nogil: + check_status_timeout_as_rpc_error( + self.inner.get().InternalKV().AsyncInternalKVDel( + ns, key, del_by_prefix, timeout_ms, + OptionalItemPyCallback[int]( + convert_optional_int, + assign_and_decrement_fut, + fut_ptr))) + return asyncio.wrap_future(fut) + + def async_internal_kv_keys(self, c_string prefix, namespace=None, timeout=None + ) -> Future[List[bytes]]: + cdef: + c_string ns = namespace or b"" + int64_t timeout_ms = round(1000 * timeout) if timeout else -1 + fut = incremented_fut() + void* fut_ptr = fut + with nogil: + check_status_timeout_as_rpc_error( + self.inner.get().InternalKV().AsyncInternalKVKeys( + ns, prefix, timeout_ms, + OptionalItemPyCallback[c_vector[c_string]]( + convert_optional_vector_str, + assign_and_decrement_fut, + fut_ptr))) + return asyncio.wrap_future(fut) + + def async_internal_kv_exists(self, c_string key, namespace=None, timeout=None + ) -> Future[bool]: + cdef: + c_string ns = namespace or b"" + int64_t timeout_ms = round(1000 * timeout) if timeout else -1 + fut = incremented_fut() + void* fut_ptr = fut + with nogil: + check_status_timeout_as_rpc_error( + self.inner.get().InternalKV().AsyncInternalKVExists( + ns, key, timeout_ms, + OptionalItemPyCallback[c_bool]( + convert_optional_bool, + assign_and_decrement_fut, + fut_ptr))) + return asyncio.wrap_future(fut) ############################################################# # NodeInfo methods @@ -180,11 +289,29 @@ cdef class NewGcsClient: int64_t timeout_ms = round(1000 * timeout) if timeout else -1 c_vector[c_string] c_node_ips = [ip for ip in node_ips] c_vector[c_bool] results + CRayStatus status + with nogil: + status = self.inner.get().Nodes().CheckAlive( + c_node_ips, timeout_ms, results) + return raise_or_return(convert_multi_bool(status, move(results))) + + def async_check_alive( + self, node_ips: List[bytes], timeout: Optional[float] = None + ) -> Future[List[bool]]: + cdef: + int64_t timeout_ms = round(1000 * timeout) if timeout else -1 + c_vector[c_string] c_node_ips = [ip for ip in node_ips] + fut = incremented_fut() + void* fut_ptr = fut with nogil: check_status_timeout_as_rpc_error( - self.inner.get().Nodes().CheckAlive(c_node_ips, timeout_ms, results) - ) - return [result for result in results] + self.inner.get().Nodes().AsyncCheckAlive( + c_node_ips, timeout_ms, + MultiItemPyCallback[c_bool]( + &convert_multi_bool, + assign_and_decrement_fut, + fut_ptr))) + return asyncio.wrap_future(fut) def drain_nodes( self, node_ids: List[bytes], timeout: Optional[float] = None @@ -194,32 +321,23 @@ cdef class NewGcsClient: int64_t timeout_ms = round(1000 * timeout) if timeout else -1 c_vector[CNodeID] c_node_ids c_vector[c_string] results + CRayStatus status for node_id in node_ids: c_node_ids.push_back(CNodeID.FromBinary(node_id)) with nogil: - check_status_timeout_as_rpc_error( - self.inner.get().Nodes().DrainNodes(c_node_ids, timeout_ms, results) - ) - return [result for result in results] + status = self.inner.get().Nodes().DrainNodes( + c_node_ids, timeout_ms, results) + return raise_or_return(convert_multi_str(status, move(results))) def get_all_node_info( self, timeout: Optional[float] = None ) -> Dict[NodeID, gcs_pb2.GcsNodeInfo]: cdef int64_t timeout_ms = round(1000 * timeout) if timeout else -1 cdef c_vector[CGcsNodeInfo] reply - cdef c_vector[c_string] serialized_reply + cdef CRayStatus status with nogil: - check_status_timeout_as_rpc_error( - self.inner.get().Nodes().GetAllNoCache(timeout_ms, reply) - ) - for node in reply: - serialized_reply.push_back(node.SerializeAsString()) - ret = {} - for serialized in serialized_reply: - proto = gcs_pb2.GcsNodeInfo() - proto.ParseFromString(serialized) - ret[NodeID.from_binary(proto.node_id)] = proto - return ret + status = self.inner.get().Nodes().GetAllNoCache(timeout_ms, reply) + return raise_or_return(convert_get_all_node_info(status, move(reply))) ############################################################# # NodeResources methods @@ -248,20 +366,30 @@ cdef class NewGcsClient: self, timeout: Optional[float] = None ) -> Dict[JobID, gcs_pb2.JobTableData]: cdef int64_t timeout_ms = round(1000 * timeout) if timeout else -1 + cdef CRayStatus status cdef c_vector[CJobTableData] reply cdef c_vector[c_string] serialized_reply + with nogil: + status = self.inner.get().Jobs().GetAll(reply, timeout_ms) + return raise_or_return((convert_get_all_job_info(status, move(reply)))) + + def async_get_all_job_info( + self, timeout: Optional[float] = None + ) -> Future[Dict[str, gcs_pb2.JobTableData]]: + cdef: + int64_t timeout_ms = round(1000 * timeout) if timeout else -1 + fut = incremented_fut() + void* fut_ptr = fut + with nogil: check_status_timeout_as_rpc_error( - self.inner.get().Jobs().GetAll(reply, timeout_ms) - ) - for i in range(reply.size()): - serialized_reply.push_back(reply[i].SerializeAsString()) - ret = {} - for serialized in serialized_reply: - proto = gcs_pb2.JobTableData() - proto.ParseFromString(serialized) - ret[JobID.from_binary(proto.job_id)] = proto - return ret + self.inner.get().Jobs().AsyncGetAll( + MultiItemPyCallback[CJobTableData]( + convert_get_all_job_info, + assign_and_decrement_fut, + fut_ptr), + timeout_ms)) + return asyncio.wrap_future(fut) ############################################################# # Runtime Env methods @@ -360,3 +488,161 @@ cdef class NewGcsClient: rejection_reason_message)) return (is_accepted, rejection_reason_message.decode()) + + +# Util functions for async handling + +cdef incremented_fut(): + fut = concurrent.futures.Future() + cpython.Py_INCREF(fut) + return fut + +cdef void assign_and_decrement_fut(result, void* fut_ptr): + cdef fut = fut_ptr + assert isinstance(fut, concurrent.futures.Future) + + assert not fut.done() + try: + ret, exc = result + if exc: + fut.set_exception(exc) + else: + fut.set_result(ret) + finally: + cpython.Py_DECREF(fut) + +cdef raise_or_return(tup): + ret, exc = tup + if exc: + raise exc + return ret + +############################################################# +# Converter functions: C++ types -> Python types, use by both Sync and Async APIs. +# They have to be defined here as pure functions because a function pointer is passed +# to C++ for Async APIs. +# +# Each function accepts what the C++ callback passes, typically a Status and a value. +# Returns `Tuple[object, Optional[Exception]]` (we are all gophers now lol). +# Must not raise exceptions, or it crashes the process. +############################################################# + +cdef convert_get_all_node_info( + CRayStatus status, c_vector[CGcsNodeInfo]&& c_data): + # -> Dict[NodeID, gcs_pb2.GcsNodeInfo] + cdef c_string b + try: + check_status_timeout_as_rpc_error(status) + node_table_data = {} + for c_proto in c_data: + b = c_proto.SerializeAsString() + proto = gcs_pb2.GcsNodeInfo() + proto.ParseFromString(b) + node_table_data[NodeID.from_binary(proto.node_id)] = proto + return node_table_data, None + except Exception as e: + return None, e + +cdef convert_get_all_job_info( + CRayStatus status, c_vector[CJobTableData]&& c_data): + # -> Dict[JobID, gcs_pb2.JobTableData] + cdef c_string b + try: + check_status_timeout_as_rpc_error(status) + job_table_data = {} + for c_proto in c_data: + b = c_proto.SerializeAsString() + proto = gcs_pb2.JobTableData() + proto.ParseFromString(b) + job_table_data[JobID.from_binary(proto.job_id)] = proto + return job_table_data, None + except Exception as e: + return None, e + +cdef convert_optional_str_none_for_not_found( + CRayStatus status, const optional[c_string]& c_str): + # If status is NotFound, return None. + # If status is OK, return the value. + # Else, raise exception. + # -> Optional[bytes] + try: + if status.IsNotFound(): + return None, None + check_status_timeout_as_rpc_error(status) + assert c_str.has_value() + return c_str.value(), None + except Exception as e: + return None, e + +cdef convert_optional_multi_get( + CRayStatus status, const optional[unordered_map[c_string, c_string]]& c_map): + # -> Dict[str, str] + cdef unordered_map[c_string, c_string].const_iterator it + try: + check_status_timeout_as_rpc_error(status) + assert c_map.has_value() + + result = {} + it = c_map.value().const_begin() + while it != c_map.value().const_end(): + key = dereference(it).first + value = dereference(it).second + result[key] = value + postincrement(it) + return result, None + except Exception as e: + return None, e + +cdef convert_optional_int(CRayStatus status, const optional[int]& c_int): + # -> int + try: + check_status_timeout_as_rpc_error(status) + assert c_int.has_value() + return c_int.value(), None + except Exception as e: + return None, e + +cdef convert_optional_vector_str( + CRayStatus status, const optional[c_vector[c_string]]& c_vec): + # -> Dict[str, str] + cdef const c_vector[c_string]* vec + cdef c_vector[c_string].const_iterator it + try: + check_status_timeout_as_rpc_error(status) + + assert c_vec.has_value() + vec = &c_vec.value() + it = vec.const_begin() + result = [] + while it != dereference(vec).const_end(): + result.append(dereference(it)) + postincrement(it) + return result, None + except Exception as e: + return None, e + + +cdef convert_optional_bool(CRayStatus status, const optional[c_bool]& b): + # -> bool + try: + check_status_timeout_as_rpc_error(status) + assert b.has_value() + return b.value(), None + except Exception as e: + return None, e + +cdef convert_multi_bool(CRayStatus status, c_vector[c_bool]&& c_data): + # -> List[bool] + try: + check_status_timeout_as_rpc_error(status) + return [b for b in c_data], None + except Exception as e: + return None, e + +cdef convert_multi_str(CRayStatus status, c_vector[c_string]&& c_data): + # -> List[bytes] + try: + check_status_timeout_as_rpc_error(status) + return [datum for datum in c_data], None + except Exception as e: + return None, e diff --git a/src/ray/gcs/gcs_client/accessor.h b/src/ray/gcs/gcs_client/accessor.h index e546790d596b..f9e299315c53 100644 --- a/src/ray/gcs/gcs_client/accessor.h +++ b/src/ray/gcs/gcs_client/accessor.h @@ -853,6 +853,7 @@ class InternalKVAccessor { /// \param added It's an output parameter. It'll be set to be true if /// any row is added. /// \return Status + /// TODO(ryw): change the out parameter type to `int` just like AsyncInternalKVPut. virtual Status Put(const std::string &ns, const std::string &key, const std::string &value, diff --git a/src/ray/gcs/gcs_client/python_callbacks.h b/src/ray/gcs/gcs_client/python_callbacks.h new file mode 100644 index 000000000000..baabda221efc --- /dev/null +++ b/src/ray/gcs/gcs_client/python_callbacks.h @@ -0,0 +1,109 @@ +// Copyright 2024 The Ray Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#include + +#include +#include + +#include "ray/util/logging.h" + +namespace ray { +namespace gcs { + +class PythonGilHolder { + public: + PythonGilHolder() : state_(PyGILState_Ensure()) {} + ~PythonGilHolder() { PyGILState_Release(state_); } + + private: + PyGILState_STATE state_; +}; + +// Facility for async bindings. C++ APIs need a callback (`std::function`), +// and in the callback we want to do type conversion and complete the Python future. +// +// An ideal API would be a `std::function` that wraps a Python callable, which +// holds references to a Python `Future`. However, Cython can't wrap a Python callable +// into a stateful C++ std::function. Instead we have to define Cython `cdef` functions +// who are translated to C++ functions, and use their function pointers. +// +// Because we can only work with stateless Cython functions, we need to keep the Future +// as a void* in this functor. This functor does not manage its lifetime: it assumes the +// void* is always valid. We Py_INCREF the Future in `incremented_fut` before passing it +// to PyCallback, and Py_DECREF it in `assign_and_decrement_fut` after the completion. +// +// Different APIs have different type signatures, but the code of completing the future +// is the same. So we ask 2 Cython function pointers: `Converter` and `Assigner`. +// `Converter` is unique for each API, converting C++ types to Python types. +// `Assigner` is shared by all APIs, completing the Python future. +// +// On C++ async API calling: +// 1. Create a Future. +// 2. Creates a `PyCallback` functor with `Converter` and `Assigner` and the Future. +// 3. Invokes the async API with the functor. +// +// On C++ async API completion: +// 1. The PyCallback functor is called. It acquires GIL and: +// 2. The functor calls the Cython function `Converter` with C++ types. It returns +// `Tuple[result, exception]`. +// 3. The functor calls the Cython function `Assigner` with the tuple and the +// Future (as void*). It assign the result or the exception to the Python future. +template +class PyCallback { + public: + // The Converter is a Cython function that takes Args... and returns a PyObject*. + // It must not raise exceptions. + // The return PyObject* is passed to the Assigner. + using Converter = PyObject *(*)(Args...); + // It must not raise exceptions. + using Assigner = void (*)(PyObject *, void *); + + PyCallback(Converter converter, Assigner assigner, void *context) + : converter(converter), assigner(assigner), context(context) {} + + void operator()(Args &&...args) { + PythonGilHolder gil; + PyObject *result = converter(std::forward(args)...); + CheckNoException(); + + assigner(result, context); + CheckNoException(); + } + + void CheckNoException() { + if (PyErr_Occurred() != nullptr) { + PyErr_Print(); + PyErr_Clear(); + RAY_LOG(FATAL) << "Python exception occurred in async binding code, exiting!"; + } + } + + private: + Converter converter = nullptr; + Assigner assigner = nullptr; + void *context = nullptr; +}; + +template +using MultiItemPyCallback = PyCallback &&>; + +template +using OptionalItemPyCallback = PyCallback &>; + +} // namespace gcs + +} // namespace ray