Skip to content

Commit

Permalink
Add docs to unit tests
Browse files Browse the repository at this point in the history
Signed-off-by: Kai Fricke <[email protected]>
  • Loading branch information
Kai Fricke committed Sep 29, 2022
1 parent 10d1760 commit ae757fe
Show file tree
Hide file tree
Showing 3 changed files with 104 additions and 0 deletions.
1 change: 1 addition & 0 deletions python/ray/air/execution/example/tune_simple.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from ray import tune
from ray.air.execution.impl.tune.progress_loop import tune_run

# from ray.air.execution.resources.fixed import FixedResourceManager
from ray.air.execution.resources.placement_group import PlacementGroupResourceManager
from ray.tune.search import BasicVariantGenerator
Expand Down
1 change: 1 addition & 0 deletions python/ray/air/execution/resources/placement_group.py
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@ def cancel_resource_request(self, resources: ResourceRequest):
)

self._pg_to_request.pop(pg)
ray.util.remove_placement_group(pg)

def has_resources_ready(self, resources: ResourceRequest) -> bool:
if not bool(len(self._request_to_ready_pgs[resources])):
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
from collections import Counter

import pytest

import ray
Expand All @@ -17,20 +19,58 @@ def ray_start_4_cpus():
ray.shutdown()


def _count_pg_states():
counter = Counter()
for _, pg_info in ray.util.placement_group_table().items():
counter[pg_info["state"]] += 1
return counter


def test_request_cancel_resources(ray_start_4_cpus):
"""Test that canceling a resource request clears the PG futures.
- Create request
- Assert actual PG is created
- Cancel request
- Assert staging future is removed
- Assert actual PG is removed
"""
manager = PlacementGroupResourceManager(update_interval=0)
assert not manager.has_resources_ready(REQUEST_2_CPU)

manager.request_resources(REQUEST_2_CPU)

# Could be pending or created
pg_states = _count_pg_states()
assert pg_states["PENDING"] + pg_states["CREATED"] == 1
assert pg_states["REMOVED"] == 0

assert manager.get_resource_futures()

manager.cancel_resource_request(REQUEST_2_CPU)

assert not manager.get_resource_futures()

pg_states = _count_pg_states()
assert pg_states["PENDING"] + pg_states["CREATED"] == 0
assert pg_states["REMOVED"] == 1


def test_acquire_return_resources(ray_start_4_cpus):
"""Tests that acquiring and returning resources works.
- At the start, no resources should be ready (no PG scheduled)
- Request resources for 2 CPUs
- (wait until they are ready)
- Assert that these 2 CPUs are available to be acquired
- Acquire
- Assert that there are no 2 CPU resources available anymore
- Return, but don't cancel the request (keep PG)
- Assert that the 2 CPU resources are available again
- Cancel request
- Assert that the 2 CPU resources are not available anymore
- This is also tested in includes test_request_cancel_resources
"""
manager = PlacementGroupResourceManager(update_interval=0)
assert not manager.has_resources_ready(REQUEST_2_CPU)

Expand All @@ -42,6 +82,11 @@ def test_acquire_return_resources(ray_start_4_cpus):

assert manager.has_resources_ready(REQUEST_2_CPU)

# PG exists
pg_states = _count_pg_states()
assert pg_states["CREATED"] == 1
assert pg_states["REMOVED"] == 0

# Acquire PG
acquired = manager.acquire_resources(REQUEST_2_CPU)

Expand All @@ -52,13 +97,34 @@ def test_acquire_return_resources(ray_start_4_cpus):

assert manager.has_resources_ready(REQUEST_2_CPU)

# PG still exists
pg_states = _count_pg_states()
assert pg_states["CREATED"] == 1
assert pg_states["REMOVED"] == 0

# Cancel request
manager.cancel_resource_request(acquired.request)

assert not manager.has_resources_ready(REQUEST_2_CPU)

# PG removed
pg_states = _count_pg_states()
assert pg_states["CREATED"] == 0
assert pg_states["REMOVED"] == 1


def test_request_pending(ray_start_4_cpus):
"""Test that requesting too many resources leads to pending PGs.
- Cluster of 4 CPUs
- Request 3 PGs a 2 CPUs
- Acquire 2 PGs
- Assert no resources are available anymore
- Return both PGs
- Assert resources are available again
- Cancel request
- Assert no resources are available again
"""
manager = PlacementGroupResourceManager(update_interval=0)
assert not manager.has_resources_ready(REQUEST_2_CPU)

Expand All @@ -72,6 +138,11 @@ def test_request_pending(ray_start_4_cpus):
assert manager.has_resources_ready(REQUEST_2_CPU)
assert len(manager.get_resource_futures()) == 1

pg_states = _count_pg_states()
assert pg_states["CREATED"] == 2
assert pg_states["PENDING"] == 1
assert pg_states["REMOVED"] == 0

acq1 = manager.acquire_resources(REQUEST_2_CPU)
acq2 = manager.acquire_resources(REQUEST_2_CPU)

Expand All @@ -85,11 +156,30 @@ def test_request_pending(ray_start_4_cpus):
assert not manager.get_resource_futures()
assert manager.has_resources_ready(REQUEST_2_CPU)

pg_states = _count_pg_states()
assert pg_states["CREATED"] == 1
assert pg_states["PENDING"] == 0
assert pg_states["REMOVED"] == 2

manager.cancel_resource_request(REQUEST_2_CPU)
assert not manager.has_resources_ready(REQUEST_2_CPU)

pg_states = _count_pg_states()
assert pg_states["CREATED"] == 0
assert pg_states["PENDING"] == 0
assert pg_states["REMOVED"] == 3


def test_acquire_unavailable(ray_start_4_cpus):
"""Test that acquiring resources that are not available returns None.
- Try to acquire
- Assert this does not work
- Request resources
- Wait until ready
- Acquire
- Assert this did work
"""
manager = PlacementGroupResourceManager(update_interval=0)
assert not manager.acquire_resources(REQUEST_2_CPU)

Expand All @@ -99,6 +189,12 @@ def test_acquire_unavailable(ray_start_4_cpus):


def test_bind_two_bundles(ray_start_4_cpus):
"""Test that binding two remote objects to a ready resource works.
- Request PG with 2 bundles (1 CPU and 2 CPUs)
- Bind two remote tasks to these bundles, execute
- Assert that resource allocation returns the correct resources: 1 CPU and 2 CPUs
"""
manager = PlacementGroupResourceManager(update_interval=0)
manager.request_resources(REQUEST_1_2_CPU)
ray.wait(manager.get_resource_futures(), num_returns=1)
Expand Down Expand Up @@ -126,6 +222,12 @@ def get_assigned_resources():


def test_bind_empty_head_bundle(ray_start_4_cpus):
"""Test that binding two remote objects to a ready resource works with empty head.
- Request PG with 2 bundles (0 CPU and 2 CPUs)
- Bind two remote tasks to these bundles, execute
- Assert that resource allocation returns the correct resources: 0 CPU and 2 CPUs
"""
manager = PlacementGroupResourceManager(update_interval=0)
assert REQUEST_0_2_CPU.head_bundle_is_empty
manager.request_resources(REQUEST_0_2_CPU)
Expand Down

0 comments on commit ae757fe

Please sign in to comment.