Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[core] Async APIs for the New GcsClient. #46788

Merged
merged 11 commits into from
Jul 27, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -2361,6 +2361,13 @@ flatbuffer_cc_library(
out_prefix = "ray/raylet/format/",
)

ray_cc_library(
name = "python_callbacks",
hdrs = [
"src/ray/gcs/gcs_client/python_callbacks.h",
],
)

pyx_library(
name = "_raylet",
srcs = glob([
Expand Down Expand Up @@ -2389,6 +2396,7 @@ pyx_library(
linkstatic = 1,
),
deps = [
":python_callbacks",
"//:core_worker_lib",
"//:exported_internal",
"//:gcs_server_lib",
Expand Down
56 changes: 53 additions & 3 deletions python/ray/_private/gcs_aio_client.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
import os
import logging
from typing import Dict, List, Optional
from concurrent.futures import ThreadPoolExecutor
from ray._raylet import GcsClient, JobID
from ray._raylet import GcsClient, NewGcsClient, JobID
from ray.core.generated import (
gcs_pb2,
)
Expand All @@ -14,14 +15,63 @@
# If the arg `executor` in GcsAioClient constructor is set, use it.
# Otherwise if env var `GCS_AIO_CLIENT_DEFAULT_THREAD_COUNT` is set, use it.
# Otherwise, use 5.
# This is only used for the OldGcsAioClient.
GCS_AIO_CLIENT_DEFAULT_THREAD_COUNT = env_integer(
"GCS_AIO_CLIENT_DEFAULT_THREAD_COUNT", 5
)


logger = logging.getLogger(__name__)


class GcsAioClient:
"""
Async GCS client.

This class is in transition to use the new C++ GcsClient binding. The old
PythonGcsClient binding is not deleted until we are confident that the new
binding is stable.

Defaults to the new binding. If you want to use the old binding, please
set the environment variable `RAY_USE_OLD_GCS_CLIENT=1`.
"""

def __new__(cls, *args, **kwargs):
use_old_client = os.getenv("RAY_USE_OLD_GCS_CLIENT") == "1"
logger.debug(f"Using {'old' if use_old_client else 'new'} GCS client")
if use_old_client:
return OldGcsAioClient(*args, **kwargs)
else:
return NewGcsAioClient(*args, **kwargs)


class NewGcsAioClient:
def __init__(
self,
address: str = None,
loop=None,
executor=None,
nums_reconnect_retry: int = 5,
):
# See https://github.com/ray-project/ray/blob/d0b46eff9ddcf9ec7256dd3a6dda33e7fb7ced95/python/ray/_raylet.pyx#L2693 # noqa: E501
timeout_ms = 1000 * (nums_reconnect_retry + 1)
self.inner = NewGcsClient.standalone(
str(address), cluster_id=None, timeout_ms=timeout_ms
)
Comment on lines +57 to +59
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ideally we can share the same underlying GCSClient?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes maybe in a next PR

# Forwarded Methods. Not using __getattr__ because we want one fewer layer of
# indirection.
self.internal_kv_get = self.inner.async_internal_kv_get
self.internal_kv_multi_get = self.inner.async_internal_kv_multi_get
self.internal_kv_put = self.inner.async_internal_kv_put
self.internal_kv_del = self.inner.async_internal_kv_del
self.internal_kv_exists = self.inner.async_internal_kv_exists
self.internal_kv_keys = self.inner.async_internal_kv_keys
self.check_alive = self.inner.async_check_alive
self.get_all_job_info = self.inner.async_get_all_job_info
# Forwarded Properties.
self.address = self.inner.address
self.cluster_id = self.inner.cluster_id


class AsyncProxy:
def __init__(self, inner, loop, executor):
self.inner = inner
Expand All @@ -45,7 +95,7 @@ def __getattr__(self, name):
return attr


class GcsAioClient:
class OldGcsAioClient:
def __init__(
self,
loop=None,
Expand Down
108 changes: 92 additions & 16 deletions python/ray/includes/common.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -44,13 +44,16 @@ cdef extern from * namespace "polyfill" nogil:


cdef extern from "ray/common/status.h" namespace "ray" nogil:
cdef cppclass StatusCode:
# TODO(ryw) in Cython 3.x we can directly use `cdef enum class CStatusCode`
cdef cppclass CStatusCode "ray::StatusCode":
pass
cdef CStatusCode CStatusCode_OK "ray::StatusCode::OK"
c_bool operator==(CStatusCode lhs, CStatusCode rhs)

cdef cppclass CRayStatus "ray::Status":
CRayStatus()
CRayStatus(StatusCode code, const c_string &msg)
CRayStatus(StatusCode code, const c_string &msg, int rpc_code)
CRayStatus(CStatusCode code, const c_string &msg)
CRayStatus(CStatusCode code, const c_string &msg, int rpc_code)
CRayStatus(const CRayStatus &s)

@staticmethod
Expand Down Expand Up @@ -135,7 +138,7 @@ cdef extern from "ray/common/status.h" namespace "ray" nogil:

c_string ToString()
c_string CodeAsString()
StatusCode code()
CStatusCode code()
c_string message()
int rpc_code()

Expand All @@ -145,18 +148,6 @@ cdef extern from "ray/common/status.h" namespace "ray" nogil:
cdef CRayStatus RayStatus_NotImplemented "Status::NotImplemented"()


cdef extern from "ray/common/status.h" namespace "ray::StatusCode" nogil:
cdef StatusCode StatusCode_OK "OK"
cdef StatusCode StatusCode_OutOfMemory "OutOfMemory"
cdef StatusCode StatusCode_KeyError "KeyError"
cdef StatusCode StatusCode_TypeError "TypeError"
cdef StatusCode StatusCode_Invalid "Invalid"
cdef StatusCode StatusCode_IOError "IOError"
cdef StatusCode StatusCode_UnknownError "UnknownError"
cdef StatusCode StatusCode_NotImplemented "NotImplemented"
cdef StatusCode StatusCode_RedisError "RedisError"


cdef extern from "ray/common/id.h" namespace "ray" nogil:
const CTaskID GenerateTaskId(const CJobID &job_id,
const CTaskID &parent_task_id,
Expand Down Expand Up @@ -376,6 +367,43 @@ cdef extern from "ray/core_worker/common.h" nogil:
const CNodeID &GetSpilledNodeID() const
const c_bool GetDidSpill() const

cdef extern from "ray/gcs/gcs_client/python_callbacks.h" namespace "ray::gcs":
# Facility for async bindings. C++ APIs need a callback (`std::function<void(T)>`),
# and in the callback we want to do type conversion and complete the Python future.
# However, Cython can't wrap a Python callable to a stateful C++ std::function.

# Fortunately, Cython can convert *pure* Cython functions to C++ function pointers.
# Hence we make this C++ Functor `CpsHandler` to wrap Python calls to C++ callbacks.

# Different APIs have different type signatures, but the code of completing the future
# is the same. So we ask 2 Cython function pointers: `Handler` and `Continuation`.
# `Handler` is unique for each API, converting C++ types to Python types. `Continuation`
# is shared by all APIs, completing the Python future.

# One issue is the `Continuation` have to be stateless, so we need to keep the Future
# in the functor. But we don't want to expose the Future to C++ too much, so we keep
# it as a void*. For that, we Py_INCREF the Future in the functor and Py_DECREF it
# after the completion.

# On C++ async API calling:
# 1. Create a Future.
# 2. Creates a `CpsHandler` functor with `Handler` and `Continuation` and the Future.
# 3. Invokes the async API with the functor.

# On C++ async API completion:
# 1. The CpsHandler functor is called. It acquires GIL and:
# 2. The functor calls the Cython function `Handler` with C++ types. It returns
# # `Tuple[result, exception]`.
# 3. The functor calls the Cython function `Continuation` with the tuple and the
# # Future (as void*). It completes the Python future with the result or with the
# # exception.

cdef cppclass MultiItemCpsHandler[T]:
MultiItemCpsHandler(object (*)(CRayStatus, c_vector[T] &&) , void (object, void*), void*) nogil

cdef cppclass OptionalItemCpsHandler[T]:
OptionalItemCpsHandler(object (*)(CRayStatus, const optional[T]&) , void (object, void*), void*) nogil

cdef extern from "ray/gcs/gcs_client/accessor.h" nogil:
cdef cppclass CActorInfoAccessor "ray::gcs::ActorInfoAccessor":
pass
Expand All @@ -385,12 +413,21 @@ cdef extern from "ray/gcs/gcs_client/accessor.h" nogil:
c_vector[CJobTableData] &result,
int64_t timeout_ms)

CRayStatus AsyncGetAll(
const MultiItemCpsHandler[CJobTableData] &callback,
int64_t timeout_ms)

cdef cppclass CNodeInfoAccessor "ray::gcs::NodeInfoAccessor":
CRayStatus CheckAlive(
const c_vector[c_string] &raylet_addresses,
int64_t timeout_ms,
c_vector[c_bool] &result)

CRayStatus AsyncCheckAlive(
const c_vector[c_string] &raylet_addresses,
int64_t timeout_ms,
const MultiItemCpsHandler[c_bool] &callback)

CRayStatus DrainNodes(
const c_vector[CNodeID] &node_ids,
int64_t timeout_ms,
Expand Down Expand Up @@ -445,6 +482,45 @@ cdef extern from "ray/gcs/gcs_client/accessor.h" nogil:
int64_t timeout_ms,
c_bool &exists)

CRayStatus AsyncInternalKVKeys(
const c_string &ns,
const c_string &prefix,
int64_t timeout_ms,
const OptionalItemCpsHandler[c_vector[c_string]] &callback)

CRayStatus AsyncInternalKVGet(
const c_string &ns,
const c_string &key,
int64_t timeout_ms,
const OptionalItemCpsHandler[c_string] &callback)

CRayStatus AsyncInternalKVMultiGet(
const c_string &ns,
const c_vector[c_string] &keys,
int64_t timeout_ms,
const OptionalItemCpsHandler[unordered_map[c_string, c_string]] &callback)

CRayStatus AsyncInternalKVPut(
const c_string &ns,
const c_string &key,
const c_string &value,
c_bool overwrite,
int64_t timeout_ms,
const OptionalItemCpsHandler[int] &callback)

CRayStatus AsyncInternalKVExists(
const c_string &ns,
const c_string &key,
int64_t timeout_ms,
const OptionalItemCpsHandler[c_bool] &callback)

CRayStatus AsyncInternalKVDel(
const c_string &ns,
const c_string &key,
c_bool del_by_prefix,
int64_t timeout_ms,
const OptionalItemCpsHandler[int] &callback)

cdef cppclass CRuntimeEnvAccessor "ray::gcs::RuntimeEnvAccessor":
CRayStatus PinRuntimeEnvUri(
const c_string &uri,
Expand Down
Loading