Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[serve] Integrate GCS fault tolerance with ray serve. #25637

Merged
merged 27 commits into from
Jun 18, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion python/ray/serve/constants.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import os

from enum import Enum

#: Used for debugging to turn on DEBUG-level logs
Expand Down
1 change: 0 additions & 1 deletion python/ray/serve/controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -345,7 +345,6 @@ def deploy(
# TODO(architkulkarni): When a deployment is redeployed, even if
# the only change was num_replicas, the start_time_ms is refreshed.
# Is this the desired behaviour?

updating = self.deployment_state_manager.deploy(name, deployment_info)

if route_prefix is not None:
Expand Down
62 changes: 37 additions & 25 deletions python/ray/serve/storage/kv_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,18 @@
import sqlite3
from typing import Optional

import ray
from ray import ray_constants
from ray._private.gcs_utils import GcsClient
from ray.serve.constants import RAY_SERVE_KV_TIMEOUT_S, SERVE_LOGGER_NAME
from ray.serve.storage.kv_store_base import KVStoreBase

try:
import boto3
from botocore.exceptions import ClientError
except ImportError:
boto3 = None

import ray
from ray import ray_constants
from ray._private.gcs_utils import GcsClient

from ray.serve.constants import SERVE_LOGGER_NAME, RAY_SERVE_KV_TIMEOUT_S
from ray.serve.storage.kv_store_base import KVStoreBase

logger = logging.getLogger(SERVE_LOGGER_NAME)

Expand All @@ -24,6 +24,10 @@ def get_storage_key(namespace: str, storage_key: str) -> str:
return "{ns}-{key}".format(ns=namespace, key=storage_key)


class KVStoreError(Exception):
pass


class RayInternalKVStore(KVStoreBase):
"""Wraps ray's internal_kv with a namespace to avoid collisions.

Expand All @@ -38,7 +42,6 @@ def __init__(
raise TypeError("namespace must a string, got: {}.".format(type(namespace)))

self.gcs_client = GcsClient(address=ray.get_runtime_context().gcs_address)

self.timeout = RAY_SERVE_KV_TIMEOUT_S
self.namespace = namespace or ""

Expand All @@ -57,13 +60,16 @@ def put(self, key: str, val: bytes) -> bool:
if not isinstance(val, bytes):
raise TypeError("val must be bytes, got: {}.".format(type(val)))

return self.gcs_client.internal_kv_put(
self.get_storage_key(key).encode(),
val,
overwrite=True,
namespace=ray_constants.KV_NAMESPACE_SERVE,
timeout=self.timeout,
)
try:
return self.gcs_client.internal_kv_put(
self.get_storage_key(key).encode(),
val,
overwrite=True,
namespace=ray_constants.KV_NAMESPACE_SERVE,
timeout=self.timeout,
)
except Exception as e:
shrekris-anyscale marked this conversation as resolved.
Show resolved Hide resolved
raise KVStoreError(e.code())

def get(self, key: str) -> Optional[bytes]:
"""Get the value associated with the given key from the store.
Expand All @@ -77,11 +83,14 @@ def get(self, key: str) -> Optional[bytes]:
if not isinstance(key, str):
raise TypeError("key must be a string, got: {}.".format(type(key)))

return self.gcs_client.internal_kv_get(
self.get_storage_key(key).encode(),
namespace=ray_constants.KV_NAMESPACE_SERVE,
timeout=self.timeout,
)
try:
return self.gcs_client.internal_kv_get(
self.get_storage_key(key).encode(),
namespace=ray_constants.KV_NAMESPACE_SERVE,
timeout=self.timeout,
)
except Exception as e:
raise KVStoreError(e.code())

def delete(self, key: str):
"""Delete the value associated with the given key from the store.
Expand All @@ -93,12 +102,15 @@ def delete(self, key: str):
if not isinstance(key, str):
raise TypeError("key must be a string, got: {}.".format(type(key)))

return self.gcs_client.internal_kv_del(
self.get_storage_key(key).encode(),
False,
namespace=ray_constants.KV_NAMESPACE_SERVE,
timeout=self.timeout,
)
try:
return self.gcs_client.internal_kv_del(
self.get_storage_key(key).encode(),
False,
namespace=ray_constants.KV_NAMESPACE_SERVE,
timeout=self.timeout,
)
except Exception as e:
raise KVStoreError(e.code())


class RayLocalKVStore(KVStoreBase):
Expand Down
5 changes: 3 additions & 2 deletions python/ray/serve/tests/storage_tests/test_kv_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,9 @@
from typing import Optional

import pytest
from ray.serve.constants import DEFAULT_CHECKPOINT_PATH

from ray._private.test_utils import simulate_storage
from ray.serve.constants import DEFAULT_CHECKPOINT_PATH
from ray.serve.storage.checkpoint_path import make_kv_store
from ray.serve.storage.kv_store import RayInternalKVStore, RayLocalKVStore, RayS3KVStore
from ray.serve.storage.kv_store_base import KVStoreBase
Expand Down Expand Up @@ -83,7 +84,7 @@ def test_external_kv_local_disk():

def test_external_kv_aws_s3():
with simulate_storage("s3", "serve-test") as uri:
from urllib.parse import urlparse, parse_qs
from urllib.parse import parse_qs, urlparse

o = urlparse(uri)
qs = parse_qs(o.query)
Expand Down
98 changes: 98 additions & 0 deletions python/ray/serve/tests/test_gcs_failure.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
import os
import sys

import grpc
import pytest
import requests

import ray
import ray.serve as serve
from ray._private.test_utils import wait_for_condition
from ray.serve.storage.kv_store import KVStoreError, RayInternalKVStore
from ray.tests.conftest import external_redis # noqa: F401


@pytest.fixture(scope="function")
def serve_ha(external_redis, monkeypatch): # noqa: F811
monkeypatch.setenv("RAY_SERVE_KV_TIMEOUT_S", "1")
address_info = ray.init(
num_cpus=36,
namespace="default_test_namespace",
_metrics_export_port=9999,
_system_config={"metrics_report_interval_ms": 1000, "task_retry_delay_ms": 50},
)
yield (address_info, serve.start(detached=True))
ray.shutdown()


def test_ray_internal_kv_timeout(serve_ha): # noqa: F811
# Firstly make sure it's working
kv1 = RayInternalKVStore()
kv1.put("1", b"1")
assert kv1.get("1") == b"1"

# Kill the GCS
ray.worker._global_node.kill_gcs_server()

with pytest.raises(KVStoreError) as e:
kv1.put("2", b"2")
assert e.value.args[0] in (
grpc.StatusCode.UNAVAILABLE,
grpc.StatusCode.DEADLINE_EXCEEDED,
)


@pytest.mark.parametrize("use_handle", [False, True])
def test_controller_gcs_failure(serve_ha, use_handle): # noqa: F811
@serve.deployment
def d(*args):
return f"{os.getpid()}"

def call():
if use_handle:
ret = ray.get(d.get_handle().remote())
else:
ret = requests.get("http://localhost:8000/d").text
print("RET=", ret)
return ret

d.deploy()
pid = call()

# Kill the GCS
print("Kill GCS")
ray.worker._global_node.kill_gcs_server()

# Make sure pid doesn't change within 5s
with pytest.raises(Exception):
wait_for_condition(lambda: pid != call(), timeout=5, retry_interval_ms=1)

print("Start GCS")
ray.worker._global_node.start_gcs_server()

# Make sure nothing changed even when GCS is back
with pytest.raises(Exception):
wait_for_condition(lambda: call() != pid, timeout=4)

d.deploy()

# Make sure redeploy happens
assert pid != call()

pid = call()

print("Kill GCS")
ray.worker._global_node.kill_gcs_server()

# Redeploy should fail
with pytest.raises(KVStoreError):
d.options().deploy()

# TODO: Check status not change once ray serve cover rollback


if __name__ == "__main__":
# When GCS is down, right now some core worker members are not cleared
# properly in ray.shutdown. Given that this is not hi-pri issue,
# using --forked for isolation.
sys.exit(pytest.main(["-v", "-s", "--forked", __file__]))
1 change: 1 addition & 0 deletions python/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -101,3 +101,4 @@ myst-parser==0.15.2
myst-nb==0.13.1
jupytext==1.13.6
pytest-docker-tools
pytest-forked