-
Notifications
You must be signed in to change notification settings - Fork 5.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[core] Add timeout and asyncio for internal kv. #25126
Changes from all commits
0e0b581
686f9bb
d9a7156
a73e6da
978231a
6688acc
a152d86
4d1fbd9
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,6 +1,6 @@ | ||
import enum | ||
import logging | ||
from typing import List, Optional | ||
from typing import Awaitable, List, Optional | ||
from functools import wraps | ||
import time | ||
|
||
|
@@ -190,6 +190,7 @@ def __init__( | |
assert isinstance(address, str) | ||
channel = GcsChannel(gcs_address=address) | ||
assert isinstance(channel, GcsChannel) | ||
assert channel._aio is False | ||
self._channel = channel | ||
self._connect() | ||
self._nums_reconnect_retry = nums_reconnect_retry | ||
|
@@ -208,10 +209,12 @@ def address(self): | |
return self._channel._gcs_address | ||
|
||
@_auto_reconnect | ||
def internal_kv_get(self, key: bytes, namespace: Optional[bytes]) -> bytes: | ||
def internal_kv_get( | ||
self, key: bytes, namespace: Optional[bytes], timeout: Optional[float] = None | ||
) -> bytes: | ||
logger.debug(f"internal_kv_get {key} {namespace}") | ||
req = gcs_service_pb2.InternalKVGetRequest(namespace=namespace, key=key) | ||
reply = self._kv_stub.InternalKVGet(req) | ||
reply = self._kv_stub.InternalKVGet(req, timeout=timeout) | ||
if reply.status.code == GcsCode.OK: | ||
return reply.value | ||
elif reply.status.code == GcsCode.NotFound: | ||
|
@@ -224,13 +227,21 @@ def internal_kv_get(self, key: bytes, namespace: Optional[bytes]) -> bytes: | |
|
||
@_auto_reconnect | ||
def internal_kv_put( | ||
self, key: bytes, value: bytes, overwrite: bool, namespace: Optional[bytes] | ||
self, | ||
key: bytes, | ||
value: bytes, | ||
overwrite: bool, | ||
namespace: Optional[bytes], | ||
timeout: Optional[float] = None, | ||
) -> int: | ||
logger.debug(f"internal_kv_put {key} {value} {overwrite} {namespace}") | ||
req = gcs_service_pb2.InternalKVPutRequest( | ||
namespace=namespace, key=key, value=value, overwrite=overwrite | ||
namespace=namespace, | ||
key=key, | ||
value=value, | ||
overwrite=overwrite, | ||
) | ||
reply = self._kv_stub.InternalKVPut(req) | ||
reply = self._kv_stub.InternalKVPut(req, timeout=timeout) | ||
if reply.status.code == GcsCode.OK: | ||
return reply.added_num | ||
else: | ||
|
@@ -241,13 +252,17 @@ def internal_kv_put( | |
|
||
@_auto_reconnect | ||
def internal_kv_del( | ||
self, key: bytes, del_by_prefix: bool, namespace: Optional[bytes] | ||
self, | ||
key: bytes, | ||
del_by_prefix: bool, | ||
namespace: Optional[bytes], | ||
timeout: Optional[float] = None, | ||
) -> int: | ||
logger.debug(f"internal_kv_del {key} {del_by_prefix} {namespace}") | ||
req = gcs_service_pb2.InternalKVDelRequest( | ||
namespace=namespace, key=key, del_by_prefix=del_by_prefix | ||
) | ||
reply = self._kv_stub.InternalKVDel(req) | ||
reply = self._kv_stub.InternalKVDel(req, timeout=timeout) | ||
if reply.status.code == GcsCode.OK: | ||
return reply.deleted_num | ||
else: | ||
|
@@ -256,10 +271,12 @@ def internal_kv_del( | |
) | ||
|
||
@_auto_reconnect | ||
def internal_kv_exists(self, key: bytes, namespace: Optional[bytes]) -> bool: | ||
def internal_kv_exists( | ||
self, key: bytes, namespace: Optional[bytes], timeout: Optional[float] = None | ||
) -> bool: | ||
logger.debug(f"internal_kv_exists {key} {namespace}") | ||
req = gcs_service_pb2.InternalKVExistsRequest(namespace=namespace, key=key) | ||
reply = self._kv_stub.InternalKVExists(req) | ||
reply = self._kv_stub.InternalKVExists(req, timeout=timeout) | ||
if reply.status.code == GcsCode.OK: | ||
return reply.exists | ||
else: | ||
|
@@ -270,11 +287,11 @@ def internal_kv_exists(self, key: bytes, namespace: Optional[bytes]) -> bool: | |
|
||
@_auto_reconnect | ||
def internal_kv_keys( | ||
self, prefix: bytes, namespace: Optional[bytes] | ||
self, prefix: bytes, namespace: Optional[bytes], timeout: Optional[float] = None | ||
) -> List[bytes]: | ||
logger.debug(f"internal_kv_keys {prefix} {namespace}") | ||
req = gcs_service_pb2.InternalKVKeysRequest(namespace=namespace, prefix=prefix) | ||
reply = self._kv_stub.InternalKVKeys(req) | ||
reply = self._kv_stub.InternalKVKeys(req, timeout=timeout) | ||
if reply.status.code == GcsCode.OK: | ||
return reply.results | ||
else: | ||
|
@@ -302,6 +319,114 @@ def pin_runtime_env_uri(self, uri: str, expiration_s: int) -> None: | |
) | ||
|
||
|
||
class GcsAioClient: | ||
def __init__( | ||
self, | ||
channel: Optional[GcsChannel] = None, | ||
address: Optional[str] = None, | ||
): | ||
if channel is None: | ||
assert isinstance(address, str) | ||
channel = GcsChannel(gcs_address=address, aio=True) | ||
assert isinstance(channel, GcsChannel) | ||
assert channel._aio is True | ||
self._channel = channel | ||
self._connect() | ||
|
||
def _connect(self): | ||
self._channel.connect() | ||
self._kv_stub = gcs_service_pb2_grpc.InternalKVGcsServiceStub( | ||
self._channel.channel() | ||
) | ||
|
||
async def internal_kv_get( | ||
self, key: bytes, namespace: Optional[bytes], timeout: Optional[float] = None | ||
) -> Awaitable[bytes]: | ||
logger.debug(f"internal_kv_get {key} {namespace}") | ||
req = gcs_service_pb2.InternalKVGetRequest(namespace=namespace, key=key) | ||
reply = await self._kv_stub.InternalKVGet(req, timeout=timeout) | ||
if reply.status.code == GcsCode.OK: | ||
return reply.value | ||
elif reply.status.code == GcsCode.NotFound: | ||
return None | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is not an Awaitable[bytes]. The caller has to check that the return value is not None before calling await on the return object. It is better to return an Awaitable here that indicates the None situation (e.g., empty bytes) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think once the funciton id async function, it doesn't matter here. It'll be converted to await automatically:
|
||
else: | ||
raise RuntimeError( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is this idiomatic in Python? To throw exceptions from a function that returns an Awaitable? In folly::future, you have the thenError() method to deal with async exception propagation. The way this method is currently, the caller has be ready for a future in the normal case, None is some cases, or an exception. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, this is valid. I feel the reason right now folly::future is doing that way is because there is no coroutine support in cpp. This is more like async operations in hacklang. |
||
f"Failed to get value for key {key} " | ||
f"due to error {reply.status.message}" | ||
) | ||
|
||
async def internal_kv_put( | ||
self, | ||
key: bytes, | ||
value: bytes, | ||
overwrite: bool, | ||
namespace: Optional[bytes], | ||
timeout: Optional[float] = None, | ||
) -> Awaitable[int]: | ||
logger.debug(f"internal_kv_put {key} {value} {overwrite} {namespace}") | ||
req = gcs_service_pb2.InternalKVPutRequest( | ||
namespace=namespace, | ||
key=key, | ||
value=value, | ||
overwrite=overwrite, | ||
) | ||
reply = await self._kv_stub.InternalKVPut(req, timeout=timeout) | ||
if reply.status.code == GcsCode.OK: | ||
return reply.added_num | ||
else: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What about the NotFound case? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think the module lacks good documentation here. Let me put it in another PR. NotFound won't return here since this is PUT operations and the return is about how many keys were added. Here the return is wired, but this is a legacy reason since the first version of internal kv defines the return in this way and we need to keep backward compatible for now. If it's returned, it means something is wrong (a bug) |
||
raise RuntimeError( | ||
f"Failed to put value {value} to key {key} " | ||
f"due to error {reply.status.message}" | ||
) | ||
|
||
async def internal_kv_del( | ||
self, | ||
key: bytes, | ||
del_by_prefix: bool, | ||
namespace: Optional[bytes], | ||
timeout: Optional[float] = None, | ||
) -> Awaitable[int]: | ||
logger.debug(f"internal_kv_del {key} {del_by_prefix} {namespace}") | ||
req = gcs_service_pb2.InternalKVDelRequest( | ||
namespace=namespace, key=key, del_by_prefix=del_by_prefix | ||
) | ||
reply = await self._kv_stub.InternalKVDel(req, timeout=timeout) | ||
if reply.status.code == GcsCode.OK: | ||
return reply.deleted_num | ||
else: | ||
raise RuntimeError( | ||
f"Failed to delete key {key} " f"due to error {reply.status.message}" | ||
) | ||
|
||
async def internal_kv_exists( | ||
self, key: bytes, namespace: Optional[bytes], timeout: Optional[float] = None | ||
) -> Awaitable[bool]: | ||
logger.debug(f"internal_kv_exists {key} {namespace}") | ||
req = gcs_service_pb2.InternalKVExistsRequest(namespace=namespace, key=key) | ||
reply = await self._kv_stub.InternalKVExists(req, timeout=timeout) | ||
if reply.status.code == GcsCode.OK: | ||
return reply.exists | ||
else: | ||
raise RuntimeError( | ||
f"Failed to check existence of key {key} " | ||
f"due to error {reply.status.message}" | ||
) | ||
|
||
async def internal_kv_keys( | ||
self, prefix: bytes, namespace: Optional[bytes], timeout: Optional[float] = None | ||
) -> Awaitable[List[bytes]]: | ||
logger.debug(f"internal_kv_keys {prefix} {namespace}") | ||
req = gcs_service_pb2.InternalKVKeysRequest(namespace=namespace, prefix=prefix) | ||
reply = await self._kv_stub.InternalKVKeys(req, timeout=timeout) | ||
if reply.status.code == GcsCode.OK: | ||
return reply.results | ||
else: | ||
raise RuntimeError( | ||
f"Failed to list prefix {prefix} " | ||
f"due to error {reply.status.message}" | ||
) | ||
|
||
|
||
def use_gcs_for_bootstrap(): | ||
"""In the current version of Ray, we always use the GCS to bootstrap. | ||
(This was previously controlled by a feature flag.) | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,118 @@ | ||
import os | ||
import sys | ||
import contextlib | ||
import signal | ||
import pytest | ||
import grpc | ||
import ray._private.gcs_utils as gcs_utils | ||
import ray | ||
|
||
|
||
@contextlib.contextmanager | ||
def stop_gcs_server(): | ||
process = ray.worker._global_node.all_processes[ | ||
ray.ray_constants.PROCESS_TYPE_GCS_SERVER | ||
][0].process | ||
pid = process.pid | ||
os.kill(pid, signal.SIGSTOP) | ||
yield | ||
os.kill(pid, signal.SIGCONT) | ||
|
||
|
||
def test_kv_basic(ray_start_regular): | ||
gcs_address = ray.worker.global_worker.gcs_client.address | ||
gcs_client = gcs_utils.GcsClient(address=gcs_address, nums_reconnect_retry=0) | ||
|
||
assert gcs_client.internal_kv_get(b"A", b"NS") is None | ||
assert gcs_client.internal_kv_put(b"A", b"B", False, b"NS") == 1 | ||
assert gcs_client.internal_kv_get(b"A", b"NS") == b"B" | ||
assert gcs_client.internal_kv_put(b"A", b"C", False, b"NS") == 0 | ||
assert gcs_client.internal_kv_get(b"A", b"NS") == b"B" | ||
assert gcs_client.internal_kv_put(b"A", b"C", True, b"NS") == 0 | ||
assert gcs_client.internal_kv_get(b"A", b"NS") == b"C" | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Add a test for getting non-existent key? |
||
assert gcs_client.internal_kv_put(b"AA", b"B", False, b"NS") == 1 | ||
assert gcs_client.internal_kv_put(b"AB", b"B", False, b"NS") == 1 | ||
assert set(gcs_client.internal_kv_keys(b"A", b"NS")) == {b"A", b"AA", b"AB"} | ||
assert gcs_client.internal_kv_del(b"A", False, b"NS") == 1 | ||
assert set(gcs_client.internal_kv_keys(b"A", b"NS")) == {b"AA", b"AB"} | ||
assert gcs_client.internal_kv_keys(b"A", b"NSS") == [] | ||
assert gcs_client.internal_kv_del(b"A", True, b"NS") == 2 | ||
assert gcs_client.internal_kv_keys(b"A", b"NS") == [] | ||
assert gcs_client.internal_kv_del(b"A", False, b"NSS") == 0 | ||
|
||
|
||
@pytest.mark.skipif(sys.platform == "win32", reason="Windows doesn't have signals.") | ||
def test_kv_timeout(ray_start_regular): | ||
gcs_address = ray.worker.global_worker.gcs_client.address | ||
gcs_client = gcs_utils.GcsClient(address=gcs_address, nums_reconnect_retry=0) | ||
|
||
assert gcs_client.internal_kv_put(b"A", b"", False, b"") == 1 | ||
|
||
with stop_gcs_server(): | ||
with pytest.raises(grpc.RpcError, match="Deadline Exceeded"): | ||
gcs_client.internal_kv_put(b"A", b"B", False, b"NS", timeout=2) | ||
|
||
with pytest.raises(grpc.RpcError, match="Deadline Exceeded"): | ||
gcs_client.internal_kv_get(b"A", b"NS", timeout=2) | ||
|
||
with pytest.raises(grpc.RpcError, match="Deadline Exceeded"): | ||
gcs_client.internal_kv_keys(b"A", b"NS", timeout=2) | ||
|
||
with pytest.raises(grpc.RpcError, match="Deadline Exceeded"): | ||
gcs_client.internal_kv_del(b"A", True, b"NS", timeout=2) | ||
|
||
|
||
@pytest.mark.asyncio | ||
async def test_kv_basic_aio(ray_start_regular): | ||
gcs_client = gcs_utils.GcsAioClient( | ||
address=ray.worker.global_worker.gcs_client.address | ||
) | ||
|
||
assert await gcs_client.internal_kv_get(b"A", b"NS") is None | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @sriram-anyscale the None case is also tested here |
||
assert await gcs_client.internal_kv_put(b"A", b"B", False, b"NS") == 1 | ||
assert await gcs_client.internal_kv_get(b"A", b"NS") == b"B" | ||
assert await gcs_client.internal_kv_put(b"A", b"C", False, b"NS") == 0 | ||
assert await gcs_client.internal_kv_get(b"A", b"NS") == b"B" | ||
assert await gcs_client.internal_kv_put(b"A", b"C", True, b"NS") == 0 | ||
assert await gcs_client.internal_kv_get(b"A", b"NS") == b"C" | ||
assert await gcs_client.internal_kv_put(b"AA", b"B", False, b"NS") == 1 | ||
assert await gcs_client.internal_kv_put(b"AB", b"B", False, b"NS") == 1 | ||
keys = await gcs_client.internal_kv_keys(b"A", b"NS") | ||
assert set(keys) == {b"A", b"AA", b"AB"} | ||
assert await gcs_client.internal_kv_del(b"A", False, b"NS") == 1 | ||
keys = await gcs_client.internal_kv_keys(b"A", b"NS") | ||
assert set(keys) == {b"AA", b"AB"} | ||
assert await gcs_client.internal_kv_keys(b"A", b"NSS") == [] | ||
assert await gcs_client.internal_kv_del(b"A", True, b"NS") == 2 | ||
assert await gcs_client.internal_kv_keys(b"A", b"NS") == [] | ||
assert await gcs_client.internal_kv_del(b"A", False, b"NSS") == 0 | ||
|
||
|
||
@pytest.mark.skipif(sys.platform == "win32", reason="Windows doesn't have signals.") | ||
@pytest.mark.asyncio | ||
async def test_kv_timeout_aio(ray_start_regular): | ||
gcs_client = gcs_utils.GcsAioClient( | ||
address=ray.worker.global_worker.gcs_client.address | ||
) | ||
# Make sure gcs_client is connected | ||
assert await gcs_client.internal_kv_put(b"A", b"", False, b"") == 1 | ||
|
||
with stop_gcs_server(): | ||
with pytest.raises(grpc.RpcError, match="Deadline Exceeded"): | ||
await gcs_client.internal_kv_put(b"A", b"B", False, b"NS", timeout=2) | ||
|
||
with pytest.raises(grpc.RpcError, match="Deadline Exceeded"): | ||
await gcs_client.internal_kv_get(b"A", b"NS", timeout=2) | ||
|
||
with pytest.raises(grpc.RpcError, match="Deadline Exceeded"): | ||
await gcs_client.internal_kv_keys(b"A", b"NS", timeout=2) | ||
|
||
with pytest.raises(grpc.RpcError, match="Deadline Exceeded"): | ||
await gcs_client.internal_kv_del(b"A", True, b"NS", timeout=2) | ||
|
||
|
||
if __name__ == "__main__": | ||
import pytest | ||
import sys | ||
|
||
sys.exit(pytest.main(["-sv", __file__])) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I just noticed this "await" - doesn't this mean this statement blocks until the result is ready?
I think I get it now - it is the return type that is confusing - you are not really returning an Awaitable - you are just allowing the interpreter to do other things while "await"ing. So Philipp's comments are what you need to fix.