Skip to content

Commit

Permalink
Revert "[Serve][Core] Fix serve_long_running memory leak by fixing GC…
Browse files Browse the repository at this point in the history
…S pubsub asyncio task leak (#29187)"

This reverts commit bced413.
  • Loading branch information
architkulkarni committed Oct 11, 2022
1 parent bced413 commit 460369b
Show file tree
Hide file tree
Showing 5 changed files with 8 additions and 69 deletions.
14 changes: 5 additions & 9 deletions python/ray/_private/gcs_pubsub.py
Original file line number Diff line number Diff line change
Expand Up @@ -505,17 +505,13 @@ async def _poll_call(self, req, timeout=None):
async def _poll(self, timeout=None) -> None:
req = self._poll_request()
while len(self._queue) == 0:
poll = asyncio.get_event_loop().create_task(
self._poll_call(req, timeout=timeout)
)
close = asyncio.get_event_loop().create_task(self._close.wait())
done, others = await asyncio.wait(
# TODO: use asyncio.create_task() after Python 3.6 is no longer
# supported.
poll = asyncio.ensure_future(self._poll_call(req, timeout=timeout))
close = asyncio.ensure_future(self._close.wait())
done, _ = await asyncio.wait(
[poll, close], timeout=timeout, return_when=asyncio.FIRST_COMPLETED
)
# Cancel the other task if needed to prevent memory leak.
other_task = others.pop()
if not other_task.done():
other_task.cancel()
if poll not in done or close in done:
# Request timed out or subscriber closed.
break
Expand Down
18 changes: 0 additions & 18 deletions python/ray/tests/test_gcs_pubsub.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import asyncio
import sys
import threading

Expand Down Expand Up @@ -141,23 +140,6 @@ async def test_aio_publish_and_subscribe_resource_usage(ray_start_regular):
await subscriber.close()


@pytest.mark.asyncio
async def test_aio_poll_no_leaks(ray_start_regular):
"""Test that polling doesn't leak memory."""
ctx = ray_start_regular
gcs_server_addr = ctx.address_info["gcs_address"]

subscriber = GcsAioResourceUsageSubscriber(address=gcs_server_addr)
await subscriber.subscribe()

for _ in range(10000):
subscriber.poll()
# There should only be 1 task, but use 10 as a buffer.
assert len(asyncio.all_tasks()) < 10

await subscriber.close()


def test_two_subscribers(ray_start_regular):
"""Tests concurrently subscribing to two channels work."""

Expand Down
30 changes: 0 additions & 30 deletions release/long_running_tests/tpl_cpu_1_c5.yaml

This file was deleted.

13 changes: 2 additions & 11 deletions release/long_running_tests/workloads/serve.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@ def update_progress(result):
cluster.add_node(
redis_port=6379 if i == 0 else None,
num_redis_shards=NUM_REDIS_SHARDS if i == 0 else None,
dashboard_agent_listen_port=(52365 + i),
num_cpus=8,
num_gpus=0,
resources={str(i): 2},
Expand All @@ -57,7 +56,7 @@ def update_progress(result):
dashboard_host="0.0.0.0",
)

ray.init(address=cluster.address, log_to_driver=False, dashboard_host="0.0.0.0")
ray.init(address=cluster.address, dashboard_host="0.0.0.0")
serve.start()


Expand Down Expand Up @@ -102,15 +101,7 @@ async def __call__(self, request):
)
proc.wait()
out, err = proc.communicate()
# Check if wrk command succeeded. If this happens repeatedly, the release test
# infrastructure will correctly fail the test with "Last update to results json
# was too long ago."
if proc.returncode != 0:
print("wrk failed with the following error: ")
print(err)
print("Will try again in 5 seconds")
time.sleep(5)
continue

# Sample wrk stdout:
#
# Running 10s test @ http://127.0.0.1:8000/echo
Expand Down
2 changes: 1 addition & 1 deletion release/release_tests.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2090,7 +2090,7 @@
team: serve
cluster:
cluster_env: app_config.yaml
cluster_compute: tpl_cpu_1_c5.yaml
cluster_compute: tpl_cpu_1.yaml

run:
timeout: 86400
Expand Down

0 comments on commit 460369b

Please sign in to comment.