diff --git a/python/ray/_private/gcs_pubsub.py b/python/ray/_private/gcs_pubsub.py index 669e7568e67b..da10ee7539f7 100644 --- a/python/ray/_private/gcs_pubsub.py +++ b/python/ray/_private/gcs_pubsub.py @@ -505,13 +505,17 @@ async def _poll_call(self, req, timeout=None): async def _poll(self, timeout=None) -> None: req = self._poll_request() while len(self._queue) == 0: - # TODO: use asyncio.create_task() after Python 3.6 is no longer - # supported. - poll = asyncio.ensure_future(self._poll_call(req, timeout=timeout)) - close = asyncio.ensure_future(self._close.wait()) - done, _ = await asyncio.wait( + poll = asyncio.get_event_loop().create_task( + self._poll_call(req, timeout=timeout) + ) + close = asyncio.get_event_loop().create_task(self._close.wait()) + done, others = await asyncio.wait( [poll, close], timeout=timeout, return_when=asyncio.FIRST_COMPLETED ) + # Cancel the other task if needed to prevent memory leak. + other_task = others.pop() + if not other_task.done(): + other_task.cancel() if poll not in done or close in done: # Request timed out or subscriber closed. break diff --git a/python/ray/tests/test_gcs_pubsub.py b/python/ray/tests/test_gcs_pubsub.py index 936ae4e04c2f..ae342050148b 100644 --- a/python/ray/tests/test_gcs_pubsub.py +++ b/python/ray/tests/test_gcs_pubsub.py @@ -1,3 +1,4 @@ +import asyncio import sys import threading @@ -140,6 +141,23 @@ async def test_aio_publish_and_subscribe_resource_usage(ray_start_regular): await subscriber.close() +@pytest.mark.asyncio +async def test_aio_poll_no_leaks(ray_start_regular): + """Test that polling doesn't leak memory.""" + ctx = ray_start_regular + gcs_server_addr = ctx.address_info["gcs_address"] + + subscriber = GcsAioResourceUsageSubscriber(address=gcs_server_addr) + await subscriber.subscribe() + + for _ in range(10000): + subscriber.poll() + # There should only be 1 task, but use 10 as a buffer. + assert len(asyncio.all_tasks()) < 10 + + await subscriber.close() + + def test_two_subscribers(ray_start_regular): """Tests concurrently subscribing to two channels work.""" diff --git a/release/long_running_tests/tpl_cpu_1_c5.yaml b/release/long_running_tests/tpl_cpu_1_c5.yaml new file mode 100644 index 000000000000..1792ac27da21 --- /dev/null +++ b/release/long_running_tests/tpl_cpu_1_c5.yaml @@ -0,0 +1,30 @@ +cloud_id: {{env["ANYSCALE_CLOUD_ID"]}} +region: us-west-2 + +max_workers: 0 + +head_node_type: + name: head_node + instance_type: c5.2xlarge + +worker_node_types: + - name: worker_node + instance_type: c5.2xlarge + min_workers: 0 + max_workers: 0 + use_spot: false + +aws: + TagSpecifications: + - ResourceType: "instance" + Tags: + - Key: anyscale-user + Value: '{{env["ANYSCALE_USER"]}}' + - Key: anyscale-expiration + Value: '{{env["EXPIRATION_2D"]}}' + + BlockDeviceMappings: + - DeviceName: /dev/sda1 + Ebs: + VolumeSize: 300 + DeleteOnTermination: true diff --git a/release/long_running_tests/workloads/serve.py b/release/long_running_tests/workloads/serve.py index 936a7fa327aa..2372242ba81f 100644 --- a/release/long_running_tests/workloads/serve.py +++ b/release/long_running_tests/workloads/serve.py @@ -48,6 +48,7 @@ def update_progress(result): cluster.add_node( redis_port=6379 if i == 0 else None, num_redis_shards=NUM_REDIS_SHARDS if i == 0 else None, + dashboard_agent_listen_port=(52365 + i), num_cpus=8, num_gpus=0, resources={str(i): 2}, @@ -56,7 +57,7 @@ def update_progress(result): dashboard_host="0.0.0.0", ) -ray.init(address=cluster.address, dashboard_host="0.0.0.0") +ray.init(address=cluster.address, log_to_driver=False, dashboard_host="0.0.0.0") serve.start() @@ -101,7 +102,15 @@ async def __call__(self, request): ) proc.wait() out, err = proc.communicate() - + # Check if wrk command succeeded. If this happens repeatedly, the release test + # infrastructure will correctly fail the test with "Last update to results json + # was too long ago." + if proc.returncode != 0: + print("wrk failed with the following error: ") + print(err) + print("Will try again in 5 seconds") + time.sleep(5) + continue # Sample wrk stdout: # # Running 10s test @ http://127.0.0.1:8000/echo diff --git a/release/release_tests.yaml b/release/release_tests.yaml index 9ea369bad4f2..45e835d8a4e5 100644 --- a/release/release_tests.yaml +++ b/release/release_tests.yaml @@ -2090,7 +2090,7 @@ team: serve cluster: cluster_env: app_config.yaml - cluster_compute: tpl_cpu_1.yaml + cluster_compute: tpl_cpu_1_c5.yaml run: timeout: 86400