Skip to content

Commit

Permalink
[Serve][Core] Fix serve_long_running memory leak by fixing GCS pubsub…
Browse files Browse the repository at this point in the history
… asyncio task leak (ray-project#29187)

Debugged with @simon-mo and @scv119 . The Serve long running test was failing due to a memory leak in dashboard.py. The root cause was in the GCS pubsub code, with the _close: asyncio.Event object adding millions of waits every few minutes without the waits ever being killed, causing the _close._waiters queue to grow without bound. The root cause is when awaiting with FIRST_COMPLETED, the caller is responsible for killing the unfinished task.

This PR:

Fixes the memory leak by canceling the close task if it wasn't done. (Contributed by @simon-mo)
This PR also adds some side improvements to the release test:

Use lower-memory instances so that memory leaks aren't hidden by the instances having a lot of available memory
Gracefully handle the case where the wrk fails, which previously caused the release test output to be overwritten in a tight loop, which led to a hard-to-interpret errors being surfaced to the release test infrastructure
Use different ports for the dashboard agents on the multiple cluster_utils virtual nodes to prevent port conflict

Signed-off-by: Weichen Xu <[email protected]>
  • Loading branch information
architkulkarni authored and WeichenXu123 committed Dec 19, 2022
1 parent ce40544 commit 40381de
Show file tree
Hide file tree
Showing 5 changed files with 69 additions and 8 deletions.
14 changes: 9 additions & 5 deletions python/ray/_private/gcs_pubsub.py
Original file line number Diff line number Diff line change
Expand Up @@ -505,13 +505,17 @@ async def _poll_call(self, req, timeout=None):
async def _poll(self, timeout=None) -> None:
req = self._poll_request()
while len(self._queue) == 0:
# TODO: use asyncio.create_task() after Python 3.6 is no longer
# supported.
poll = asyncio.ensure_future(self._poll_call(req, timeout=timeout))
close = asyncio.ensure_future(self._close.wait())
done, _ = await asyncio.wait(
poll = asyncio.get_event_loop().create_task(
self._poll_call(req, timeout=timeout)
)
close = asyncio.get_event_loop().create_task(self._close.wait())
done, others = await asyncio.wait(
[poll, close], timeout=timeout, return_when=asyncio.FIRST_COMPLETED
)
# Cancel the other task if needed to prevent memory leak.
other_task = others.pop()
if not other_task.done():
other_task.cancel()
if poll not in done or close in done:
# Request timed out or subscriber closed.
break
Expand Down
18 changes: 18 additions & 0 deletions python/ray/tests/test_gcs_pubsub.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import asyncio
import sys
import threading

Expand Down Expand Up @@ -140,6 +141,23 @@ async def test_aio_publish_and_subscribe_resource_usage(ray_start_regular):
await subscriber.close()


@pytest.mark.asyncio
async def test_aio_poll_no_leaks(ray_start_regular):
"""Test that polling doesn't leak memory."""
ctx = ray_start_regular
gcs_server_addr = ctx.address_info["gcs_address"]

subscriber = GcsAioResourceUsageSubscriber(address=gcs_server_addr)
await subscriber.subscribe()

for _ in range(10000):
subscriber.poll()
# There should only be 1 task, but use 10 as a buffer.
assert len(asyncio.all_tasks()) < 10

await subscriber.close()


def test_two_subscribers(ray_start_regular):
"""Tests concurrently subscribing to two channels work."""

Expand Down
30 changes: 30 additions & 0 deletions release/long_running_tests/tpl_cpu_1_c5.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
cloud_id: {{env["ANYSCALE_CLOUD_ID"]}}
region: us-west-2

max_workers: 0

head_node_type:
name: head_node
instance_type: c5.2xlarge

worker_node_types:
- name: worker_node
instance_type: c5.2xlarge
min_workers: 0
max_workers: 0
use_spot: false

aws:
TagSpecifications:
- ResourceType: "instance"
Tags:
- Key: anyscale-user
Value: '{{env["ANYSCALE_USER"]}}'
- Key: anyscale-expiration
Value: '{{env["EXPIRATION_2D"]}}'

BlockDeviceMappings:
- DeviceName: /dev/sda1
Ebs:
VolumeSize: 300
DeleteOnTermination: true
13 changes: 11 additions & 2 deletions release/long_running_tests/workloads/serve.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ def update_progress(result):
cluster.add_node(
redis_port=6379 if i == 0 else None,
num_redis_shards=NUM_REDIS_SHARDS if i == 0 else None,
dashboard_agent_listen_port=(52365 + i),
num_cpus=8,
num_gpus=0,
resources={str(i): 2},
Expand All @@ -56,7 +57,7 @@ def update_progress(result):
dashboard_host="0.0.0.0",
)

ray.init(address=cluster.address, dashboard_host="0.0.0.0")
ray.init(address=cluster.address, log_to_driver=False, dashboard_host="0.0.0.0")
serve.start()


Expand Down Expand Up @@ -101,7 +102,15 @@ async def __call__(self, request):
)
proc.wait()
out, err = proc.communicate()

# Check if wrk command succeeded. If this happens repeatedly, the release test
# infrastructure will correctly fail the test with "Last update to results json
# was too long ago."
if proc.returncode != 0:
print("wrk failed with the following error: ")
print(err)
print("Will try again in 5 seconds")
time.sleep(5)
continue
# Sample wrk stdout:
#
# Running 10s test @ http://127.0.0.1:8000/echo
Expand Down
2 changes: 1 addition & 1 deletion release/release_tests.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2090,7 +2090,7 @@
team: serve
cluster:
cluster_env: app_config.yaml
cluster_compute: tpl_cpu_1.yaml
cluster_compute: tpl_cpu_1_c5.yaml

run:
timeout: 86400
Expand Down

0 comments on commit 40381de

Please sign in to comment.