Skip to content

Commit

Permalink
Fix typing of gcs_utils.py and add check to CI (#25285)
Browse files Browse the repository at this point in the history
  • Loading branch information
pcmoritz authored May 31, 2022
1 parent a2e33e1 commit f61997d
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 27 deletions.
1 change: 1 addition & 0 deletions ci/lint/format.sh
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@ MYPY_FILES=(
# in the CI. Type check once we get serious about type checking:
#'ray_operator/operator.py'
'ray_operator/operator_utils.py'
'_private/gcs_utils.py'
)

BLACK_EXCLUDES=(
Expand Down
54 changes: 27 additions & 27 deletions python/ray/_private/gcs_utils.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import enum
import logging
from typing import Awaitable, List, Optional
from typing import List, Optional
from functools import wraps
import time

Expand Down Expand Up @@ -211,8 +211,8 @@ def address(self):
@_auto_reconnect
def internal_kv_get(
self, key: bytes, namespace: Optional[bytes], timeout: Optional[float] = None
) -> bytes:
logger.debug(f"internal_kv_get {key} {namespace}")
) -> Optional[bytes]:
logger.debug(f"internal_kv_get {key!r} {namespace!r}")
req = gcs_service_pb2.InternalKVGetRequest(namespace=namespace, key=key)
reply = self._kv_stub.InternalKVGet(req, timeout=timeout)
if reply.status.code == GcsCode.OK:
Expand All @@ -221,7 +221,7 @@ def internal_kv_get(
return None
else:
raise RuntimeError(
f"Failed to get value for key {key} "
f"Failed to get value for key {key!r} "
f"due to error {reply.status.message}"
)

Expand All @@ -234,7 +234,7 @@ def internal_kv_put(
namespace: Optional[bytes],
timeout: Optional[float] = None,
) -> int:
logger.debug(f"internal_kv_put {key} {value} {overwrite} {namespace}")
logger.debug(f"internal_kv_put {key!r} {value!r} {overwrite} {namespace!r}")
req = gcs_service_pb2.InternalKVPutRequest(
namespace=namespace,
key=key,
Expand All @@ -246,7 +246,7 @@ def internal_kv_put(
return reply.added_num
else:
raise RuntimeError(
f"Failed to put value {value} to key {key} "
f"Failed to put value {value!r} to key {key!r} "
f"due to error {reply.status.message}"
)

Expand All @@ -258,7 +258,7 @@ def internal_kv_del(
namespace: Optional[bytes],
timeout: Optional[float] = None,
) -> int:
logger.debug(f"internal_kv_del {key} {del_by_prefix} {namespace}")
logger.debug(f"internal_kv_del {key!r} {del_by_prefix} {namespace!r}")
req = gcs_service_pb2.InternalKVDelRequest(
namespace=namespace, key=key, del_by_prefix=del_by_prefix
)
Expand All @@ -267,36 +267,36 @@ def internal_kv_del(
return reply.deleted_num
else:
raise RuntimeError(
f"Failed to delete key {key} " f"due to error {reply.status.message}"
f"Failed to delete key {key!r} " f"due to error {reply.status.message}"
)

@_auto_reconnect
def internal_kv_exists(
self, key: bytes, namespace: Optional[bytes], timeout: Optional[float] = None
) -> bool:
logger.debug(f"internal_kv_exists {key} {namespace}")
logger.debug(f"internal_kv_exists {key!r} {namespace!r}")
req = gcs_service_pb2.InternalKVExistsRequest(namespace=namespace, key=key)
reply = self._kv_stub.InternalKVExists(req, timeout=timeout)
if reply.status.code == GcsCode.OK:
return reply.exists
else:
raise RuntimeError(
f"Failed to check existence of key {key} "
f"Failed to check existence of key {key!r} "
f"due to error {reply.status.message}"
)

@_auto_reconnect
def internal_kv_keys(
self, prefix: bytes, namespace: Optional[bytes], timeout: Optional[float] = None
) -> List[bytes]:
logger.debug(f"internal_kv_keys {prefix} {namespace}")
logger.debug(f"internal_kv_keys {prefix!r} {namespace!r}")
req = gcs_service_pb2.InternalKVKeysRequest(namespace=namespace, prefix=prefix)
reply = self._kv_stub.InternalKVKeys(req, timeout=timeout)
if reply.status.code == GcsCode.OK:
return reply.results
else:
raise RuntimeError(
f"Failed to list prefix {prefix} "
f"Failed to list prefix {prefix!r} "
f"due to error {reply.status.message}"
)

Expand Down Expand Up @@ -341,8 +341,8 @@ def _connect(self):

async def internal_kv_get(
self, key: bytes, namespace: Optional[bytes], timeout: Optional[float] = None
) -> Awaitable[bytes]:
logger.debug(f"internal_kv_get {key} {namespace}")
) -> Optional[bytes]:
logger.debug(f"internal_kv_get {key!r} {namespace!r}")
req = gcs_service_pb2.InternalKVGetRequest(namespace=namespace, key=key)
reply = await self._kv_stub.InternalKVGet(req, timeout=timeout)
if reply.status.code == GcsCode.OK:
Expand All @@ -351,7 +351,7 @@ async def internal_kv_get(
return None
else:
raise RuntimeError(
f"Failed to get value for key {key} "
f"Failed to get value for key {key!r} "
f"due to error {reply.status.message}"
)

Expand All @@ -362,8 +362,8 @@ async def internal_kv_put(
overwrite: bool,
namespace: Optional[bytes],
timeout: Optional[float] = None,
) -> Awaitable[int]:
logger.debug(f"internal_kv_put {key} {value} {overwrite} {namespace}")
) -> int:
logger.debug(f"internal_kv_put {key!r} {value!r} {overwrite} {namespace!r}")
req = gcs_service_pb2.InternalKVPutRequest(
namespace=namespace,
key=key,
Expand All @@ -375,7 +375,7 @@ async def internal_kv_put(
return reply.added_num
else:
raise RuntimeError(
f"Failed to put value {value} to key {key} "
f"Failed to put value {value!r} to key {key!r} "
f"due to error {reply.status.message}"
)

Expand All @@ -385,8 +385,8 @@ async def internal_kv_del(
del_by_prefix: bool,
namespace: Optional[bytes],
timeout: Optional[float] = None,
) -> Awaitable[int]:
logger.debug(f"internal_kv_del {key} {del_by_prefix} {namespace}")
) -> int:
logger.debug(f"internal_kv_del {key!r} {del_by_prefix} {namespace!r}")
req = gcs_service_pb2.InternalKVDelRequest(
namespace=namespace, key=key, del_by_prefix=del_by_prefix
)
Expand All @@ -395,34 +395,34 @@ async def internal_kv_del(
return reply.deleted_num
else:
raise RuntimeError(
f"Failed to delete key {key} " f"due to error {reply.status.message}"
f"Failed to delete key {key!r} " f"due to error {reply.status.message}"
)

async def internal_kv_exists(
self, key: bytes, namespace: Optional[bytes], timeout: Optional[float] = None
) -> Awaitable[bool]:
logger.debug(f"internal_kv_exists {key} {namespace}")
) -> bool:
logger.debug(f"internal_kv_exists {key!r} {namespace!r}")
req = gcs_service_pb2.InternalKVExistsRequest(namespace=namespace, key=key)
reply = await self._kv_stub.InternalKVExists(req, timeout=timeout)
if reply.status.code == GcsCode.OK:
return reply.exists
else:
raise RuntimeError(
f"Failed to check existence of key {key} "
f"Failed to check existence of key {key!r} "
f"due to error {reply.status.message}"
)

async def internal_kv_keys(
self, prefix: bytes, namespace: Optional[bytes], timeout: Optional[float] = None
) -> Awaitable[List[bytes]]:
logger.debug(f"internal_kv_keys {prefix} {namespace}")
) -> List[bytes]:
logger.debug(f"internal_kv_keys {prefix!r} {namespace!r}")
req = gcs_service_pb2.InternalKVKeysRequest(namespace=namespace, prefix=prefix)
reply = await self._kv_stub.InternalKVKeys(req, timeout=timeout)
if reply.status.code == GcsCode.OK:
return reply.results
else:
raise RuntimeError(
f"Failed to list prefix {prefix} "
f"Failed to list prefix {prefix!r} "
f"due to error {reply.status.message}"
)

Expand Down

0 comments on commit f61997d

Please sign in to comment.