Skip to content

Commit

Permalink
[core] Async APIs for the New GcsClient. (#46788)
Browse files Browse the repository at this point in the history
Implements async binding of C++ GcsClient in Python as NewGcsAioClient.

Previously we only have sync GcsClient bindings, ones that blocks on
completion. To facilitate GcsAioClient we use python thread pool
executor - one dedicated thread blocked for the sync call, whose
underlying API is async. This is a big waste and we can do better.

The trick is to play the callback-to-async games wisely. Invoke a C++
async API with a python callback function; the callback serializes the
reply data or exception, then switch to the python asyncio thread and
complete a future. The future, in turn, is awaited by a converter
function that does any python-side treatment (e.g. python protobuf
deserialization, or converting to dict), then pass on to user code. The
end result is an async method just like Python-native ones.

This PR adds the NewGcsAioClient and uses it as implementation of
GcsAioClient by default. Can switch back to the OldGcsAioClient by
RAY_USE_OLD_GCS_CLIENT=1.

Signed-off-by: Ruiyang Wang <[email protected]>
  • Loading branch information
rynewang authored Jul 27, 2024
1 parent 2ac8712 commit 4783379
Show file tree
Hide file tree
Showing 6 changed files with 597 additions and 93 deletions.
8 changes: 8 additions & 0 deletions BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -2361,6 +2361,13 @@ flatbuffer_cc_library(
out_prefix = "ray/raylet/format/",
)

ray_cc_library(
name = "python_callbacks",
hdrs = [
"src/ray/gcs/gcs_client/python_callbacks.h",
],
)

pyx_library(
name = "_raylet",
srcs = glob([
Expand Down Expand Up @@ -2389,6 +2396,7 @@ pyx_library(
linkstatic = 1,
),
deps = [
":python_callbacks",
"//:core_worker_lib",
"//:exported_internal",
"//:gcs_server_lib",
Expand Down
56 changes: 53 additions & 3 deletions python/ray/_private/gcs_aio_client.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
import os
import logging
from typing import Dict, List, Optional
from concurrent.futures import ThreadPoolExecutor
from ray._raylet import GcsClient, JobID
from ray._raylet import GcsClient, NewGcsClient, JobID
from ray.core.generated import (
gcs_pb2,
)
Expand All @@ -14,14 +15,63 @@
# If the arg `executor` in GcsAioClient constructor is set, use it.
# Otherwise if env var `GCS_AIO_CLIENT_DEFAULT_THREAD_COUNT` is set, use it.
# Otherwise, use 5.
# This is only used for the OldGcsAioClient.
GCS_AIO_CLIENT_DEFAULT_THREAD_COUNT = env_integer(
"GCS_AIO_CLIENT_DEFAULT_THREAD_COUNT", 5
)


logger = logging.getLogger(__name__)


class GcsAioClient:
"""
Async GCS client.
This class is in transition to use the new C++ GcsClient binding. The old
PythonGcsClient binding is not deleted until we are confident that the new
binding is stable.
Defaults to the new binding. If you want to use the old binding, please
set the environment variable `RAY_USE_OLD_GCS_CLIENT=1`.
"""

def __new__(cls, *args, **kwargs):
use_old_client = os.getenv("RAY_USE_OLD_GCS_CLIENT") == "1"
logger.debug(f"Using {'old' if use_old_client else 'new'} GCS client")
if use_old_client:
return OldGcsAioClient(*args, **kwargs)
else:
return NewGcsAioClient(*args, **kwargs)


class NewGcsAioClient:
def __init__(
self,
address: str = None,
loop=None,
executor=None,
nums_reconnect_retry: int = 5,
):
# See https://github.com/ray-project/ray/blob/d0b46eff9ddcf9ec7256dd3a6dda33e7fb7ced95/python/ray/_raylet.pyx#L2693 # noqa: E501
timeout_ms = 1000 * (nums_reconnect_retry + 1)
self.inner = NewGcsClient.standalone(
str(address), cluster_id=None, timeout_ms=timeout_ms
)
# Forwarded Methods. Not using __getattr__ because we want one fewer layer of
# indirection.
self.internal_kv_get = self.inner.async_internal_kv_get
self.internal_kv_multi_get = self.inner.async_internal_kv_multi_get
self.internal_kv_put = self.inner.async_internal_kv_put
self.internal_kv_del = self.inner.async_internal_kv_del
self.internal_kv_exists = self.inner.async_internal_kv_exists
self.internal_kv_keys = self.inner.async_internal_kv_keys
self.check_alive = self.inner.async_check_alive
self.get_all_job_info = self.inner.async_get_all_job_info
# Forwarded Properties.
self.address = self.inner.address
self.cluster_id = self.inner.cluster_id


class AsyncProxy:
def __init__(self, inner, loop, executor):
self.inner = inner
Expand All @@ -45,7 +95,7 @@ def __getattr__(self, name):
return attr


class GcsAioClient:
class OldGcsAioClient:
def __init__(
self,
loop=None,
Expand Down
82 changes: 66 additions & 16 deletions python/ray/includes/common.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -44,13 +44,16 @@ cdef extern from * namespace "polyfill" nogil:


cdef extern from "ray/common/status.h" namespace "ray" nogil:
cdef cppclass StatusCode:
# TODO(ryw) in Cython 3.x we can directly use `cdef enum class CStatusCode`
cdef cppclass CStatusCode "ray::StatusCode":
pass
cdef CStatusCode CStatusCode_OK "ray::StatusCode::OK"
c_bool operator==(CStatusCode lhs, CStatusCode rhs)

cdef cppclass CRayStatus "ray::Status":
CRayStatus()
CRayStatus(StatusCode code, const c_string &msg)
CRayStatus(StatusCode code, const c_string &msg, int rpc_code)
CRayStatus(CStatusCode code, const c_string &msg)
CRayStatus(CStatusCode code, const c_string &msg, int rpc_code)
CRayStatus(const CRayStatus &s)

@staticmethod
Expand Down Expand Up @@ -135,7 +138,7 @@ cdef extern from "ray/common/status.h" namespace "ray" nogil:

c_string ToString()
c_string CodeAsString()
StatusCode code()
CStatusCode code()
c_string message()
int rpc_code()

Expand All @@ -145,18 +148,6 @@ cdef extern from "ray/common/status.h" namespace "ray" nogil:
cdef CRayStatus RayStatus_NotImplemented "Status::NotImplemented"()


cdef extern from "ray/common/status.h" namespace "ray::StatusCode" nogil:
cdef StatusCode StatusCode_OK "OK"
cdef StatusCode StatusCode_OutOfMemory "OutOfMemory"
cdef StatusCode StatusCode_KeyError "KeyError"
cdef StatusCode StatusCode_TypeError "TypeError"
cdef StatusCode StatusCode_Invalid "Invalid"
cdef StatusCode StatusCode_IOError "IOError"
cdef StatusCode StatusCode_UnknownError "UnknownError"
cdef StatusCode StatusCode_NotImplemented "NotImplemented"
cdef StatusCode StatusCode_RedisError "RedisError"


cdef extern from "ray/common/id.h" namespace "ray" nogil:
const CTaskID GenerateTaskId(const CJobID &job_id,
const CTaskID &parent_task_id,
Expand Down Expand Up @@ -376,6 +367,17 @@ cdef extern from "ray/core_worker/common.h" nogil:
const CNodeID &GetSpilledNodeID() const
const c_bool GetDidSpill() const

cdef extern from "ray/gcs/gcs_client/python_callbacks.h" namespace "ray::gcs":
cdef cppclass MultiItemPyCallback[T]:
MultiItemPyCallback(
object (*)(CRayStatus, c_vector[T] &&),
void (object, void*), void*) nogil

cdef cppclass OptionalItemPyCallback[T]:
OptionalItemPyCallback(
object (*)(CRayStatus, const optional[T]&),
void (object, void*), void*) nogil

cdef extern from "ray/gcs/gcs_client/accessor.h" nogil:
cdef cppclass CActorInfoAccessor "ray::gcs::ActorInfoAccessor":
pass
Expand All @@ -385,12 +387,21 @@ cdef extern from "ray/gcs/gcs_client/accessor.h" nogil:
c_vector[CJobTableData] &result,
int64_t timeout_ms)

CRayStatus AsyncGetAll(
const MultiItemPyCallback[CJobTableData] &callback,
int64_t timeout_ms)

cdef cppclass CNodeInfoAccessor "ray::gcs::NodeInfoAccessor":
CRayStatus CheckAlive(
const c_vector[c_string] &raylet_addresses,
int64_t timeout_ms,
c_vector[c_bool] &result)

CRayStatus AsyncCheckAlive(
const c_vector[c_string] &raylet_addresses,
int64_t timeout_ms,
const MultiItemPyCallback[c_bool] &callback)

CRayStatus DrainNodes(
const c_vector[CNodeID] &node_ids,
int64_t timeout_ms,
Expand Down Expand Up @@ -445,6 +456,45 @@ cdef extern from "ray/gcs/gcs_client/accessor.h" nogil:
int64_t timeout_ms,
c_bool &exists)

CRayStatus AsyncInternalKVKeys(
const c_string &ns,
const c_string &prefix,
int64_t timeout_ms,
const OptionalItemPyCallback[c_vector[c_string]] &callback)

CRayStatus AsyncInternalKVGet(
const c_string &ns,
const c_string &key,
int64_t timeout_ms,
const OptionalItemPyCallback[c_string] &callback)

CRayStatus AsyncInternalKVMultiGet(
const c_string &ns,
const c_vector[c_string] &keys,
int64_t timeout_ms,
const OptionalItemPyCallback[unordered_map[c_string, c_string]] &callback)

CRayStatus AsyncInternalKVPut(
const c_string &ns,
const c_string &key,
const c_string &value,
c_bool overwrite,
int64_t timeout_ms,
const OptionalItemPyCallback[int] &callback)

CRayStatus AsyncInternalKVExists(
const c_string &ns,
const c_string &key,
int64_t timeout_ms,
const OptionalItemPyCallback[c_bool] &callback)

CRayStatus AsyncInternalKVDel(
const c_string &ns,
const c_string &key,
c_bool del_by_prefix,
int64_t timeout_ms,
const OptionalItemPyCallback[int] &callback)

cdef cppclass CRuntimeEnvAccessor "ray::gcs::RuntimeEnvAccessor":
CRayStatus PinRuntimeEnvUri(
const c_string &uri,
Expand Down
Loading

0 comments on commit 4783379

Please sign in to comment.