Skip to content

Commit

Permalink
[Serve] Make long poll wait for non-existent keys (#19205)
Browse files Browse the repository at this point in the history
  • Loading branch information
simon-mo authored Oct 8, 2021
1 parent 9f06648 commit 46e8034
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 13 deletions.
22 changes: 9 additions & 13 deletions python/ray/serve/long_poll.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,12 +104,9 @@ def _process_update(self, updates: Dict[str, UpdatedObject]):
return

if isinstance(updates, (ray.exceptions.RayTaskError)):
# This can happen during shutdown where the controller doesn't
# contain this key, we will just repull.
# NOTE(simon): should we repull or just wait in the long poll
# host?
if not isinstance(updates.as_instanceof_cause(), ValueError):
logger.error("LongPollHost errored\n" + updates.traceback_str)
# Some error happened in the controller. It could be a bug or some
# undesired state.
logger.error("LongPollHost errored\n" + updates.traceback_str)
self._poll_next()
return

Expand Down Expand Up @@ -167,22 +164,21 @@ async def listen_for_change(
until there's one updates.
"""
watched_keys = keys_to_snapshot_ids.keys()
nonexistent_keys = set(watched_keys) - set(self.snapshot_ids.keys())
if len(nonexistent_keys) > 0:
raise ValueError(f"Keys not found: {nonexistent_keys}.")
existent_keys = set(watched_keys).intersection(
set(self.snapshot_ids.keys()))

# 2. If there are any outdated keys (by comparing snapshot ids)
# return immediately.
# If there are any outdated keys (by comparing snapshot ids)
# return immediately.
client_outdated_keys = {
key: UpdatedObject(self.object_snapshots[key],
self.snapshot_ids[key])
for key in watched_keys
for key in existent_keys
if self.snapshot_ids[key] != keys_to_snapshot_ids[key]
}
if len(client_outdated_keys) > 0:
return client_outdated_keys

# 3. Otherwise, register asyncio events to be waited.
# Otherwise, register asyncio events to be waited.
async_task_to_watched_keys = {}
for key in watched_keys:
# Create a new asyncio event for this key
Expand Down
14 changes: 14 additions & 0 deletions python/ray/serve/tests/test_long_poll.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,20 @@ def test_host_standalone(serve_instance):
assert "key_2" in result


def test_long_poll_wait_for_keys(serve_instance):
# Variation of the basic case, but the keys are requests before any values
# are set.
host = ray.remote(LongPollHost).remote()
object_ref = host.listen_for_change.remote({"key_1": -1, "key_2": -1})
ray.get(host.notify_changed.remote("key_1", 999))
ray.get(host.notify_changed.remote("key_2", 999))

# We should be able to get the one of the result immediately
result: Dict[str, UpdatedObject] = ray.get(object_ref)
assert set(result.keys()).issubset({"key_1", "key_2"})
assert {v.object_snapshot for v in result.values()} == {999}


def test_long_poll_restarts(serve_instance):
@ray.remote(
max_restarts=-1,
Expand Down

0 comments on commit 46e8034

Please sign in to comment.