Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[core] Add timeout and asyncio for internal kv. #25126

Merged
merged 8 commits into from
May 26, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
149 changes: 137 additions & 12 deletions python/ray/_private/gcs_utils.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import enum
import logging
from typing import List, Optional
from typing import Awaitable, List, Optional
from functools import wraps
import time

Expand Down Expand Up @@ -190,6 +190,7 @@ def __init__(
assert isinstance(address, str)
channel = GcsChannel(gcs_address=address)
assert isinstance(channel, GcsChannel)
assert channel._aio is False
self._channel = channel
self._connect()
self._nums_reconnect_retry = nums_reconnect_retry
Expand All @@ -208,10 +209,12 @@ def address(self):
return self._channel._gcs_address

@_auto_reconnect
def internal_kv_get(self, key: bytes, namespace: Optional[bytes]) -> bytes:
def internal_kv_get(
self, key: bytes, namespace: Optional[bytes], timeout: Optional[float] = None
) -> bytes:
logger.debug(f"internal_kv_get {key} {namespace}")
req = gcs_service_pb2.InternalKVGetRequest(namespace=namespace, key=key)
reply = self._kv_stub.InternalKVGet(req)
reply = self._kv_stub.InternalKVGet(req, timeout=timeout)
if reply.status.code == GcsCode.OK:
return reply.value
elif reply.status.code == GcsCode.NotFound:
Expand All @@ -224,13 +227,21 @@ def internal_kv_get(self, key: bytes, namespace: Optional[bytes]) -> bytes:

@_auto_reconnect
def internal_kv_put(
self, key: bytes, value: bytes, overwrite: bool, namespace: Optional[bytes]
self,
key: bytes,
value: bytes,
overwrite: bool,
namespace: Optional[bytes],
timeout: Optional[float] = None,
) -> int:
logger.debug(f"internal_kv_put {key} {value} {overwrite} {namespace}")
req = gcs_service_pb2.InternalKVPutRequest(
namespace=namespace, key=key, value=value, overwrite=overwrite
namespace=namespace,
key=key,
value=value,
overwrite=overwrite,
)
reply = self._kv_stub.InternalKVPut(req)
reply = self._kv_stub.InternalKVPut(req, timeout=timeout)
if reply.status.code == GcsCode.OK:
return reply.added_num
else:
Expand All @@ -241,13 +252,17 @@ def internal_kv_put(

@_auto_reconnect
def internal_kv_del(
self, key: bytes, del_by_prefix: bool, namespace: Optional[bytes]
self,
key: bytes,
del_by_prefix: bool,
namespace: Optional[bytes],
timeout: Optional[float] = None,
) -> int:
logger.debug(f"internal_kv_del {key} {del_by_prefix} {namespace}")
req = gcs_service_pb2.InternalKVDelRequest(
namespace=namespace, key=key, del_by_prefix=del_by_prefix
)
reply = self._kv_stub.InternalKVDel(req)
reply = self._kv_stub.InternalKVDel(req, timeout=timeout)
if reply.status.code == GcsCode.OK:
return reply.deleted_num
else:
Expand All @@ -256,10 +271,12 @@ def internal_kv_del(
)

@_auto_reconnect
def internal_kv_exists(self, key: bytes, namespace: Optional[bytes]) -> bool:
def internal_kv_exists(
self, key: bytes, namespace: Optional[bytes], timeout: Optional[float] = None
) -> bool:
logger.debug(f"internal_kv_exists {key} {namespace}")
req = gcs_service_pb2.InternalKVExistsRequest(namespace=namespace, key=key)
reply = self._kv_stub.InternalKVExists(req)
reply = self._kv_stub.InternalKVExists(req, timeout=timeout)
if reply.status.code == GcsCode.OK:
return reply.exists
else:
Expand All @@ -270,11 +287,11 @@ def internal_kv_exists(self, key: bytes, namespace: Optional[bytes]) -> bool:

@_auto_reconnect
def internal_kv_keys(
self, prefix: bytes, namespace: Optional[bytes]
self, prefix: bytes, namespace: Optional[bytes], timeout: Optional[float] = None
) -> List[bytes]:
logger.debug(f"internal_kv_keys {prefix} {namespace}")
req = gcs_service_pb2.InternalKVKeysRequest(namespace=namespace, prefix=prefix)
reply = self._kv_stub.InternalKVKeys(req)
reply = self._kv_stub.InternalKVKeys(req, timeout=timeout)
if reply.status.code == GcsCode.OK:
return reply.results
else:
Expand Down Expand Up @@ -302,6 +319,114 @@ def pin_runtime_env_uri(self, uri: str, expiration_s: int) -> None:
)


class GcsAioClient:
def __init__(
self,
channel: Optional[GcsChannel] = None,
address: Optional[str] = None,
):
if channel is None:
assert isinstance(address, str)
channel = GcsChannel(gcs_address=address, aio=True)
assert isinstance(channel, GcsChannel)
assert channel._aio is True
self._channel = channel
self._connect()

def _connect(self):
self._channel.connect()
self._kv_stub = gcs_service_pb2_grpc.InternalKVGcsServiceStub(
self._channel.channel()
)

async def internal_kv_get(
self, key: bytes, namespace: Optional[bytes], timeout: Optional[float] = None
) -> Awaitable[bytes]:
logger.debug(f"internal_kv_get {key} {namespace}")
req = gcs_service_pb2.InternalKVGetRequest(namespace=namespace, key=key)
reply = await self._kv_stub.InternalKVGet(req, timeout=timeout)
Copy link
Contributor

@sriram-anyscale sriram-anyscale May 29, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just noticed this "await" - doesn't this mean this statement blocks until the result is ready?

I think I get it now - it is the return type that is confusing - you are not really returning an Awaitable - you are just allowing the interpreter to do other things while "await"ing. So Philipp's comments are what you need to fix.

if reply.status.code == GcsCode.OK:
return reply.value
elif reply.status.code == GcsCode.NotFound:
return None
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not an Awaitable[bytes]. The caller has to check that the return value is not None before calling await on the return object. It is better to return an Awaitable here that indicates the None situation (e.g., empty bytes)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think once the funciton id async function, it doesn't matter here. It'll be converted to await automatically:

(base) yic@ip-172-31-58-40:~ $ cat x.py
import asyncio

async def f():
    return None

async def main():
    v = await f()
    print("v=", v)

asyncio.run(main())
(base) yic@ip-172-31-58-40:~ $ python x.py
v= None
(base) yic@ip-172-31-58-40:~ $

else:
raise RuntimeError(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this idiomatic in Python? To throw exceptions from a function that returns an Awaitable? In folly::future, you have the thenError() method to deal with async exception propagation. The way this method is currently, the caller has be ready for a future in the normal case, None is some cases, or an exception.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, this is valid. I feel the reason right now folly::future is doing that way is because there is no coroutine support in cpp. This is more like async operations in hacklang.

f"Failed to get value for key {key} "
f"due to error {reply.status.message}"
)

async def internal_kv_put(
self,
key: bytes,
value: bytes,
overwrite: bool,
namespace: Optional[bytes],
timeout: Optional[float] = None,
) -> Awaitable[int]:
logger.debug(f"internal_kv_put {key} {value} {overwrite} {namespace}")
req = gcs_service_pb2.InternalKVPutRequest(
namespace=namespace,
key=key,
value=value,
overwrite=overwrite,
)
reply = await self._kv_stub.InternalKVPut(req, timeout=timeout)
if reply.status.code == GcsCode.OK:
return reply.added_num
else:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What about the NotFound case?

Copy link
Contributor Author

@fishbone fishbone May 27, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the module lacks good documentation here. Let me put it in another PR.

NotFound won't return here since this is PUT operations and the return is about how many keys were added. Here the return is wired, but this is a legacy reason since the first version of internal kv defines the return in this way and we need to keep backward compatible for now.

If it's returned, it means something is wrong (a bug)

raise RuntimeError(
f"Failed to put value {value} to key {key} "
f"due to error {reply.status.message}"
)

async def internal_kv_del(
self,
key: bytes,
del_by_prefix: bool,
namespace: Optional[bytes],
timeout: Optional[float] = None,
) -> Awaitable[int]:
logger.debug(f"internal_kv_del {key} {del_by_prefix} {namespace}")
req = gcs_service_pb2.InternalKVDelRequest(
namespace=namespace, key=key, del_by_prefix=del_by_prefix
)
reply = await self._kv_stub.InternalKVDel(req, timeout=timeout)
if reply.status.code == GcsCode.OK:
return reply.deleted_num
else:
raise RuntimeError(
f"Failed to delete key {key} " f"due to error {reply.status.message}"
)

async def internal_kv_exists(
self, key: bytes, namespace: Optional[bytes], timeout: Optional[float] = None
) -> Awaitable[bool]:
logger.debug(f"internal_kv_exists {key} {namespace}")
req = gcs_service_pb2.InternalKVExistsRequest(namespace=namespace, key=key)
reply = await self._kv_stub.InternalKVExists(req, timeout=timeout)
if reply.status.code == GcsCode.OK:
return reply.exists
else:
raise RuntimeError(
f"Failed to check existence of key {key} "
f"due to error {reply.status.message}"
)

async def internal_kv_keys(
self, prefix: bytes, namespace: Optional[bytes], timeout: Optional[float] = None
) -> Awaitable[List[bytes]]:
logger.debug(f"internal_kv_keys {prefix} {namespace}")
req = gcs_service_pb2.InternalKVKeysRequest(namespace=namespace, prefix=prefix)
reply = await self._kv_stub.InternalKVKeys(req, timeout=timeout)
if reply.status.code == GcsCode.OK:
return reply.results
else:
raise RuntimeError(
f"Failed to list prefix {prefix} "
f"due to error {reply.status.message}"
)


def use_gcs_for_bootstrap():
"""In the current version of Ray, we always use the GCS to bootstrap.
(This was previously controlled by a feature flag.)
Expand Down
1 change: 1 addition & 0 deletions python/ray/tests/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,7 @@ py_test_module_list(
"test_raylet_output.py",
"test_scheduling_performance.py",
"test_get_or_create_actor.py",
"test_gcs_utils.py",
],
size = "small",
extra_srcs = SRCS,
Expand Down
118 changes: 118 additions & 0 deletions python/ray/tests/test_gcs_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
import os
import sys
import contextlib
import signal
import pytest
import grpc
import ray._private.gcs_utils as gcs_utils
import ray


@contextlib.contextmanager
def stop_gcs_server():
process = ray.worker._global_node.all_processes[
ray.ray_constants.PROCESS_TYPE_GCS_SERVER
][0].process
pid = process.pid
os.kill(pid, signal.SIGSTOP)
yield
os.kill(pid, signal.SIGCONT)


def test_kv_basic(ray_start_regular):
gcs_address = ray.worker.global_worker.gcs_client.address
gcs_client = gcs_utils.GcsClient(address=gcs_address, nums_reconnect_retry=0)

assert gcs_client.internal_kv_get(b"A", b"NS") is None
assert gcs_client.internal_kv_put(b"A", b"B", False, b"NS") == 1
assert gcs_client.internal_kv_get(b"A", b"NS") == b"B"
assert gcs_client.internal_kv_put(b"A", b"C", False, b"NS") == 0
assert gcs_client.internal_kv_get(b"A", b"NS") == b"B"
assert gcs_client.internal_kv_put(b"A", b"C", True, b"NS") == 0
assert gcs_client.internal_kv_get(b"A", b"NS") == b"C"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add a test for getting non-existent key?

assert gcs_client.internal_kv_put(b"AA", b"B", False, b"NS") == 1
assert gcs_client.internal_kv_put(b"AB", b"B", False, b"NS") == 1
assert set(gcs_client.internal_kv_keys(b"A", b"NS")) == {b"A", b"AA", b"AB"}
assert gcs_client.internal_kv_del(b"A", False, b"NS") == 1
assert set(gcs_client.internal_kv_keys(b"A", b"NS")) == {b"AA", b"AB"}
assert gcs_client.internal_kv_keys(b"A", b"NSS") == []
assert gcs_client.internal_kv_del(b"A", True, b"NS") == 2
assert gcs_client.internal_kv_keys(b"A", b"NS") == []
assert gcs_client.internal_kv_del(b"A", False, b"NSS") == 0


@pytest.mark.skipif(sys.platform == "win32", reason="Windows doesn't have signals.")
def test_kv_timeout(ray_start_regular):
gcs_address = ray.worker.global_worker.gcs_client.address
gcs_client = gcs_utils.GcsClient(address=gcs_address, nums_reconnect_retry=0)

assert gcs_client.internal_kv_put(b"A", b"", False, b"") == 1

with stop_gcs_server():
with pytest.raises(grpc.RpcError, match="Deadline Exceeded"):
gcs_client.internal_kv_put(b"A", b"B", False, b"NS", timeout=2)

with pytest.raises(grpc.RpcError, match="Deadline Exceeded"):
gcs_client.internal_kv_get(b"A", b"NS", timeout=2)

with pytest.raises(grpc.RpcError, match="Deadline Exceeded"):
gcs_client.internal_kv_keys(b"A", b"NS", timeout=2)

with pytest.raises(grpc.RpcError, match="Deadline Exceeded"):
gcs_client.internal_kv_del(b"A", True, b"NS", timeout=2)


@pytest.mark.asyncio
async def test_kv_basic_aio(ray_start_regular):
gcs_client = gcs_utils.GcsAioClient(
address=ray.worker.global_worker.gcs_client.address
)

assert await gcs_client.internal_kv_get(b"A", b"NS") is None
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@sriram-anyscale the None case is also tested here

assert await gcs_client.internal_kv_put(b"A", b"B", False, b"NS") == 1
assert await gcs_client.internal_kv_get(b"A", b"NS") == b"B"
assert await gcs_client.internal_kv_put(b"A", b"C", False, b"NS") == 0
assert await gcs_client.internal_kv_get(b"A", b"NS") == b"B"
assert await gcs_client.internal_kv_put(b"A", b"C", True, b"NS") == 0
assert await gcs_client.internal_kv_get(b"A", b"NS") == b"C"
assert await gcs_client.internal_kv_put(b"AA", b"B", False, b"NS") == 1
assert await gcs_client.internal_kv_put(b"AB", b"B", False, b"NS") == 1
keys = await gcs_client.internal_kv_keys(b"A", b"NS")
assert set(keys) == {b"A", b"AA", b"AB"}
assert await gcs_client.internal_kv_del(b"A", False, b"NS") == 1
keys = await gcs_client.internal_kv_keys(b"A", b"NS")
assert set(keys) == {b"AA", b"AB"}
assert await gcs_client.internal_kv_keys(b"A", b"NSS") == []
assert await gcs_client.internal_kv_del(b"A", True, b"NS") == 2
assert await gcs_client.internal_kv_keys(b"A", b"NS") == []
assert await gcs_client.internal_kv_del(b"A", False, b"NSS") == 0


@pytest.mark.skipif(sys.platform == "win32", reason="Windows doesn't have signals.")
@pytest.mark.asyncio
async def test_kv_timeout_aio(ray_start_regular):
gcs_client = gcs_utils.GcsAioClient(
address=ray.worker.global_worker.gcs_client.address
)
# Make sure gcs_client is connected
assert await gcs_client.internal_kv_put(b"A", b"", False, b"") == 1

with stop_gcs_server():
with pytest.raises(grpc.RpcError, match="Deadline Exceeded"):
await gcs_client.internal_kv_put(b"A", b"B", False, b"NS", timeout=2)

with pytest.raises(grpc.RpcError, match="Deadline Exceeded"):
await gcs_client.internal_kv_get(b"A", b"NS", timeout=2)

with pytest.raises(grpc.RpcError, match="Deadline Exceeded"):
await gcs_client.internal_kv_keys(b"A", b"NS", timeout=2)

with pytest.raises(grpc.RpcError, match="Deadline Exceeded"):
await gcs_client.internal_kv_del(b"A", True, b"NS", timeout=2)


if __name__ == "__main__":
import pytest
import sys

sys.exit(pytest.main(["-sv", __file__]))