diff --git a/python/ray/_private/gcs_pubsub.py b/python/ray/_private/gcs_pubsub.py index da10ee7539f7..669e7568e67b 100644 --- a/python/ray/_private/gcs_pubsub.py +++ b/python/ray/_private/gcs_pubsub.py @@ -505,17 +505,13 @@ async def _poll_call(self, req, timeout=None): async def _poll(self, timeout=None) -> None: req = self._poll_request() while len(self._queue) == 0: - poll = asyncio.get_event_loop().create_task( - self._poll_call(req, timeout=timeout) - ) - close = asyncio.get_event_loop().create_task(self._close.wait()) - done, others = await asyncio.wait( + # TODO: use asyncio.create_task() after Python 3.6 is no longer + # supported. + poll = asyncio.ensure_future(self._poll_call(req, timeout=timeout)) + close = asyncio.ensure_future(self._close.wait()) + done, _ = await asyncio.wait( [poll, close], timeout=timeout, return_when=asyncio.FIRST_COMPLETED ) - # Cancel the other task if needed to prevent memory leak. - other_task = others.pop() - if not other_task.done(): - other_task.cancel() if poll not in done or close in done: # Request timed out or subscriber closed. break diff --git a/python/ray/tests/test_gcs_pubsub.py b/python/ray/tests/test_gcs_pubsub.py index ae342050148b..936ae4e04c2f 100644 --- a/python/ray/tests/test_gcs_pubsub.py +++ b/python/ray/tests/test_gcs_pubsub.py @@ -1,4 +1,3 @@ -import asyncio import sys import threading @@ -141,23 +140,6 @@ async def test_aio_publish_and_subscribe_resource_usage(ray_start_regular): await subscriber.close() -@pytest.mark.asyncio -async def test_aio_poll_no_leaks(ray_start_regular): - """Test that polling doesn't leak memory.""" - ctx = ray_start_regular - gcs_server_addr = ctx.address_info["gcs_address"] - - subscriber = GcsAioResourceUsageSubscriber(address=gcs_server_addr) - await subscriber.subscribe() - - for _ in range(10000): - subscriber.poll() - # There should only be 1 task, but use 10 as a buffer. - assert len(asyncio.all_tasks()) < 10 - - await subscriber.close() - - def test_two_subscribers(ray_start_regular): """Tests concurrently subscribing to two channels work.""" diff --git a/release/long_running_tests/tpl_cpu_1_c5.yaml b/release/long_running_tests/tpl_cpu_1_c5.yaml deleted file mode 100644 index 1792ac27da21..000000000000 --- a/release/long_running_tests/tpl_cpu_1_c5.yaml +++ /dev/null @@ -1,30 +0,0 @@ -cloud_id: {{env["ANYSCALE_CLOUD_ID"]}} -region: us-west-2 - -max_workers: 0 - -head_node_type: - name: head_node - instance_type: c5.2xlarge - -worker_node_types: - - name: worker_node - instance_type: c5.2xlarge - min_workers: 0 - max_workers: 0 - use_spot: false - -aws: - TagSpecifications: - - ResourceType: "instance" - Tags: - - Key: anyscale-user - Value: '{{env["ANYSCALE_USER"]}}' - - Key: anyscale-expiration - Value: '{{env["EXPIRATION_2D"]}}' - - BlockDeviceMappings: - - DeviceName: /dev/sda1 - Ebs: - VolumeSize: 300 - DeleteOnTermination: true diff --git a/release/long_running_tests/workloads/serve.py b/release/long_running_tests/workloads/serve.py index 2372242ba81f..936a7fa327aa 100644 --- a/release/long_running_tests/workloads/serve.py +++ b/release/long_running_tests/workloads/serve.py @@ -48,7 +48,6 @@ def update_progress(result): cluster.add_node( redis_port=6379 if i == 0 else None, num_redis_shards=NUM_REDIS_SHARDS if i == 0 else None, - dashboard_agent_listen_port=(52365 + i), num_cpus=8, num_gpus=0, resources={str(i): 2}, @@ -57,7 +56,7 @@ def update_progress(result): dashboard_host="0.0.0.0", ) -ray.init(address=cluster.address, log_to_driver=False, dashboard_host="0.0.0.0") +ray.init(address=cluster.address, dashboard_host="0.0.0.0") serve.start() @@ -102,15 +101,7 @@ async def __call__(self, request): ) proc.wait() out, err = proc.communicate() - # Check if wrk command succeeded. If this happens repeatedly, the release test - # infrastructure will correctly fail the test with "Last update to results json - # was too long ago." - if proc.returncode != 0: - print("wrk failed with the following error: ") - print(err) - print("Will try again in 5 seconds") - time.sleep(5) - continue + # Sample wrk stdout: # # Running 10s test @ http://127.0.0.1:8000/echo diff --git a/release/release_tests.yaml b/release/release_tests.yaml index 45e835d8a4e5..9ea369bad4f2 100644 --- a/release/release_tests.yaml +++ b/release/release_tests.yaml @@ -2090,7 +2090,7 @@ team: serve cluster: cluster_env: app_config.yaml - cluster_compute: tpl_cpu_1_c5.yaml + cluster_compute: tpl_cpu_1.yaml run: timeout: 86400