Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Serve] warn when serve.start() with different options #21562

Merged
merged 5 commits into from
Jan 21, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 44 additions & 1 deletion python/ray/serve/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,9 @@ def __init__(self,
self._shutdown = False
self._http_config: HTTPOptions = ray.get(
controller.get_http_config.remote())
self._root_url = ray.get(self._controller.get_root_url.remote())
self._root_url = ray.get(controller.get_root_url.remote())
self._checkpoint_path = ray.get(
controller.get_checkpoint_path.remote())

# Each handle has the overhead of long poll client, therefore cached.
self.handle_cache = dict()
Expand All @@ -126,6 +128,14 @@ def shutdown_serve_client():
def root_url(self):
return self._root_url

@property
def http_config(self):
return self._http_config

@property
def checkpoint_path(self):
return self._checkpoint_path

def __del__(self):
if not self._detached:
logger.debug("Shutting down Ray Serve because client went out of "
Expand Down Expand Up @@ -365,6 +375,35 @@ def get_handle(
return handle


def _check_http_and_checkpoint_options(
client: Client,
http_options: Union[dict, HTTPOptions],
checkpoint_path: str,
) -> None:
if checkpoint_path and checkpoint_path != client.checkpoint_path:
logger.warning(
f"The new client checkpoint path '{checkpoint_path}' "
f"is different from the existing one '{client.checkpoint_path}'. "
"The new checkpoint path is ignored.")

if http_options:
client_http_options = client.http_config
new_http_options = http_options if isinstance(
http_options, HTTPOptions) else HTTPOptions.parse_obj(http_options)
different_fields = []
all_http_option_fields = new_http_options.__dict__
for field in all_http_option_fields:
if getattr(new_http_options, field) != getattr(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we don't know what middlewares are used, we do a shallow check for each item in the list assuming that the ordering cannot be changed.

client_http_options, field):
different_fields.append(field)

if len(different_fields):
logger.warning(
"The new client HTTP config differs from the existing one "
f"in the following fields: {different_fields}. "
"The new HTTP config is ignored.")


@PublicAPI(stability="beta")
def start(
detached: bool = False,
Expand Down Expand Up @@ -425,6 +464,10 @@ def start(
client = _get_global_client()
logger.info("Connecting to existing Serve instance in namespace "
f"'{controller_namespace}'.")

_check_http_and_checkpoint_options(client, http_options,
_checkpoint_path)

return client
except RayServeException:
pass
Expand Down
4 changes: 4 additions & 0 deletions python/ray/serve/controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ async def __init__(self,
# Used to read/write checkpoints.
self.controller_namespace = ray.get_runtime_context().namespace
self.controller_name = controller_name
self.checkpoint_path = checkpoint_path
kv_store_namespace = (
f"{self.controller_name}-{self.controller_namespace}")
self.kv_store = make_kv_store(
Expand Down Expand Up @@ -129,6 +130,9 @@ async def listen_for_change(self, keys_to_snapshot_ids: Dict[str, int]):
return await (
self.long_poll_host.listen_for_change(keys_to_snapshot_ids))

def get_checkpoint_path(self) -> str:
return self.checkpoint_path

def get_all_endpoints(self) -> Dict[EndpointTag, Dict[str, Any]]:
"""Returns a dictionary of deployment name to config."""
return self.endpoint_state.get_endpoints()
Expand Down
35 changes: 35 additions & 0 deletions python/ray/serve/tests/test_standalone.py
Original file line number Diff line number Diff line change
Expand Up @@ -607,5 +607,40 @@ def verify_snapshot():
assert hello_deployment["status"] == "RUNNING"


def test_serve_start_different_http_checkpoint_options_warning(caplog):
import logging
from tempfile import mkstemp
from ray.serve.utils import logger
from ray._private.services import new_port

caplog.set_level(logging.WARNING, logger="ray.serve")

warning_msg = []

class WarningHandler(logging.Handler):
def emit(self, record):
warning_msg.append(self.format(record))

logger.addHandler(WarningHandler())

ray.init(namespace="serve-test")
serve.start(detached=True)

# create a different config
test_http = dict(host="127.1.1.8", port=new_port())
_, tmp_path = mkstemp()
test_ckpt = f"file://{tmp_path}"

serve.start(
detached=True, http_options=test_http, _checkpoint_path=test_ckpt)

for test_config, msg in zip([[test_ckpt], ["host", "port"]], warning_msg):
for test_msg in test_config:
assert test_msg in msg

serve.shutdown()
ray.shutdown()


if __name__ == "__main__":
sys.exit(pytest.main(["-v", "-s", __file__]))