From 7a36c47a421e7e6283ee2786c98538356d9ab788 Mon Sep 17 00:00:00 2001 From: Richard Liaw Date: Fri, 14 Sep 2018 22:02:51 -0700 Subject: [PATCH 1/7] Try fix --- python/ray/experimental/state.py | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/python/ray/experimental/state.py b/python/ray/experimental/state.py index 6a0770e9981b..c129f58e973a 100644 --- a/python/ray/experimental/state.py +++ b/python/ray/experimental/state.py @@ -508,10 +508,12 @@ def client_table(self): if message is None: return [] - node_info = [] + node_info = {} gcs_entry = ray.gcs_utils.GcsTableEntry.GetRootAsGcsTableEntry( message, 0) + # Since GCS entries are append-only, we override so that + # only the latest entries are kept. for i in range(gcs_entry.EntriesLength()): client = ( ray.gcs_utils.ClientTableData.GetRootAsClientTableData( @@ -522,8 +524,9 @@ def client_table(self): client.ResourcesTotalCapacity(i) for i in range(client.ResourcesTotalLabelLength()) } - node_info.append({ - "ClientID": ray.utils.binary_to_hex(client.ClientId()), + client_id = ray.utils.binary_to_hex(client.ClientId()) + node_info[client_id] = { + "ClientID": client_id, "IsInsertion": client.IsInsertion(), "NodeManagerAddress": decode(client.NodeManagerAddress()), "NodeManagerPort": client.NodeManagerPort(), @@ -532,8 +535,8 @@ def client_table(self): client.ObjectStoreSocketName()), "RayletSocketName": decode(client.RayletSocketName()), "Resources": resources - }) - return node_info + } + return list(node_info.values()) def log_files(self): """Fetch and return a dictionary of log file names to outputs. From 90e2f4ce08072978df29fec9c4a874c324c88ad3 Mon Sep 17 00:00:00 2001 From: Richard Liaw Date: Fri, 14 Sep 2018 23:16:52 -0700 Subject: [PATCH 2/7] add strong assertions --- python/ray/experimental/state.py | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/python/ray/experimental/state.py b/python/ray/experimental/state.py index c129f58e973a..ffe57179e5d7 100644 --- a/python/ray/experimental/state.py +++ b/python/ray/experimental/state.py @@ -525,6 +525,16 @@ def client_table(self): for i in range(client.ResourcesTotalLabelLength()) } client_id = ray.utils.binary_to_hex(client.ClientId()) + + # If this client is being removed, then it must + # have previously been inserted, and + # it cannot have previously been removed. + if not client.IsInsertion(): + assert client_id in node_info, \ + "Client removed not found!" + assert node_info[client_id]["IsInsertion"], \ + "Unexpected duplicate removal of client..." + node_info[client_id] = { "ClientID": client_id, "IsInsertion": client.IsInsertion(), From 8a1b614592231269e46ee6815401ce47351f1e02 Mon Sep 17 00:00:00 2001 From: Richard Liaw Date: Sat, 15 Sep 2018 01:54:11 -0700 Subject: [PATCH 3/7] Add multi node test for state --- python/ray/test/test_global_state.py | 83 ++++++++++++++++++++++++---- 1 file changed, 71 insertions(+), 12 deletions(-) diff --git a/python/ray/test/test_global_state.py b/python/ray/test/test_global_state.py index e796d6013f33..1a184dc9f7b9 100644 --- a/python/ray/test/test_global_state.py +++ b/python/ray/test/test_global_state.py @@ -3,17 +3,11 @@ from __future__ import print_function import time - +import pytest +import subprocess import ray - -def setup_module(): - if not ray.worker.global_worker.connected: - ray.init(num_cpus=1) - - # Finish initializing Ray. Otherwise available_resources() does not - # reflect resource use of submitted tasks - ray.get(cpu_task.remote(0)) +REDIS_PORT = 6543 @ray.remote(num_cpus=1) @@ -21,14 +15,38 @@ def cpu_task(seconds): time.sleep(seconds) +def _get_raylet_pid(raylet_socket): + output = subprocess.check_output("ps -a".split(" ")) + all_processes_split = output.decode("ascii").split("\n") + search_term = "python/ray/core/src/ray/raylet/raylet {}".format( + raylet_socket) + print([x for x in all_processes_split if search_term in x]) + return [ + x.strip().split(" ")[0] for x in all_processes_split + if search_term in x + ][0] + + class TestAvailableResources(object): - timeout = 10 + @classmethod + def setup_class(cls): + if not ray.worker.global_worker.connected: + ray.init(num_cpus=1) + + # Finish initializing Ray. Otherwise available_resources() does not + # reflect resource use of submitted tasks + ray.get(cpu_task.remote(0)) + + @classmethod + def teardown_class(cls): + ray.shutdown() def test_no_tasks(self): cluster_resources = ray.global_state.cluster_resources() available_resources = ray.global_state.cluster_resources() assert cluster_resources == available_resources + @pytest.mark.timeout(10) def test_replenish_resources(self): cluster_resources = ray.global_state.cluster_resources() @@ -36,19 +54,20 @@ def test_replenish_resources(self): start = time.time() resources_reset = False - while not resources_reset and time.time() - start < self.timeout: + while not resources_reset: resources_reset = ( cluster_resources == ray.global_state.available_resources()) assert resources_reset + @pytest.mark.timeout(10) def test_uses_resources(self): cluster_resources = ray.global_state.cluster_resources() task_id = cpu_task.remote(1) start = time.time() resource_used = False - while not resource_used and time.time() - start < self.timeout: + while not resource_used: available_resources = ray.global_state.available_resources() resource_used = available_resources[ "CPU"] == cluster_resources["CPU"] - 1 @@ -56,3 +75,43 @@ def test_uses_resources(self): assert resource_used ray.get(task_id) # clean up to reset resources + + +class TestMultiNodeState(object): + @classmethod + def setup_class(cls): + subprocess.check_call("ray start --head --redis-port " + "{port} --num-cpus 1 --use-raylet".format( + port=REDIS_PORT).split(" ")) + ray.init(redis_address="localhost:{}".format(REDIS_PORT)) + + @classmethod + def teardown_class(cls): + subprocess.check_call("ray stop".split(" ")) + ray.shutdown() + + @pytest.mark.timeout(20) + def test_add_remove_client(self): + """Tests client table is correct after node removal.""" + clients = ray.global_state.client_table() + assert len(clients) == 1 + head_raylet_pid = _get_raylet_pid(clients[0]["RayletSocketName"]) + + subprocess.check_call( + "ray start --redis-address localhost:{port} " + "--num-cpus 1 --use-raylet".format(port=REDIS_PORT).split(" ")) + + clients = ray.global_state.client_table() + assert len(clients) == 2 + assert sum(cl["Resources"].get("CPU") for cl in clients) == 2 + + worker_raylet_pid = _get_raylet_pid(clients[1]["RayletSocketName"]) + assert head_raylet_pid != worker_raylet_pid + + subprocess.check_output(["kill", str(worker_raylet_pid)]) + + # wait for heartbeat + while all(cl_entries["IsInsertion"] for cl_entries in clients): + clients = ray.global_state.client_table() + time.sleep(1) + assert sum(cl["Resources"].get("CPU", 0) for cl in clients) == 1 From 0dc424aa5c91cb4631a17f1aacacd2ec7c05899b Mon Sep 17 00:00:00 2001 From: Richard Liaw Date: Sat, 15 Sep 2018 01:59:35 -0700 Subject: [PATCH 4/7] remove extra print --- python/ray/test/test_global_state.py | 1 - 1 file changed, 1 deletion(-) diff --git a/python/ray/test/test_global_state.py b/python/ray/test/test_global_state.py index 1a184dc9f7b9..9a3f12a17518 100644 --- a/python/ray/test/test_global_state.py +++ b/python/ray/test/test_global_state.py @@ -20,7 +20,6 @@ def _get_raylet_pid(raylet_socket): all_processes_split = output.decode("ascii").split("\n") search_term = "python/ray/core/src/ray/raylet/raylet {}".format( raylet_socket) - print([x for x in all_processes_split if search_term in x]) return [ x.strip().split(" ")[0] for x in all_processes_split if search_term in x From f729e6ff36684b8144a70e6e2dd8debcae5ef9d7 Mon Sep 17 00:00:00 2001 From: Robert Nishihara Date: Sat, 15 Sep 2018 13:17:04 -0700 Subject: [PATCH 5/7] Minor --- python/ray/experimental/state.py | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/python/ray/experimental/state.py b/python/ray/experimental/state.py index ffe57179e5d7..d91165637b60 100644 --- a/python/ray/experimental/state.py +++ b/python/ray/experimental/state.py @@ -530,10 +530,9 @@ def client_table(self): # have previously been inserted, and # it cannot have previously been removed. if not client.IsInsertion(): - assert client_id in node_info, \ - "Client removed not found!" - assert node_info[client_id]["IsInsertion"], \ - "Unexpected duplicate removal of client..." + assert client_id in node_info, "Client removed not found!" + assert node_info[client_id]["IsInsertion"], ( + "Unexpected duplicate removal of client.") node_info[client_id] = { "ClientID": client_id, From 6afeb4c43e1314787000e66949ac3c05798558c5 Mon Sep 17 00:00:00 2001 From: Richard Date: Sun, 16 Sep 2018 23:21:08 -0700 Subject: [PATCH 6/7] Revert "remove extra print" This reverts commit 0dc424aa5c91cb4631a17f1aacacd2ec7c05899b. --- python/ray/test/test_global_state.py | 1 + 1 file changed, 1 insertion(+) diff --git a/python/ray/test/test_global_state.py b/python/ray/test/test_global_state.py index 9a3f12a17518..1a184dc9f7b9 100644 --- a/python/ray/test/test_global_state.py +++ b/python/ray/test/test_global_state.py @@ -20,6 +20,7 @@ def _get_raylet_pid(raylet_socket): all_processes_split = output.decode("ascii").split("\n") search_term = "python/ray/core/src/ray/raylet/raylet {}".format( raylet_socket) + print([x for x in all_processes_split if search_term in x]) return [ x.strip().split(" ")[0] for x in all_processes_split if search_term in x From 8123dc2d163035a290dba41ab81c26dd3920eaa4 Mon Sep 17 00:00:00 2001 From: Richard Date: Sun, 16 Sep 2018 23:21:09 -0700 Subject: [PATCH 7/7] Revert "Add multi node test for state" This reverts commit 8a1b614592231269e46ee6815401ce47351f1e02. --- python/ray/test/test_global_state.py | 83 ++++------------------------ 1 file changed, 12 insertions(+), 71 deletions(-) diff --git a/python/ray/test/test_global_state.py b/python/ray/test/test_global_state.py index 1a184dc9f7b9..e796d6013f33 100644 --- a/python/ray/test/test_global_state.py +++ b/python/ray/test/test_global_state.py @@ -3,11 +3,17 @@ from __future__ import print_function import time -import pytest -import subprocess + import ray -REDIS_PORT = 6543 + +def setup_module(): + if not ray.worker.global_worker.connected: + ray.init(num_cpus=1) + + # Finish initializing Ray. Otherwise available_resources() does not + # reflect resource use of submitted tasks + ray.get(cpu_task.remote(0)) @ray.remote(num_cpus=1) @@ -15,38 +21,14 @@ def cpu_task(seconds): time.sleep(seconds) -def _get_raylet_pid(raylet_socket): - output = subprocess.check_output("ps -a".split(" ")) - all_processes_split = output.decode("ascii").split("\n") - search_term = "python/ray/core/src/ray/raylet/raylet {}".format( - raylet_socket) - print([x for x in all_processes_split if search_term in x]) - return [ - x.strip().split(" ")[0] for x in all_processes_split - if search_term in x - ][0] - - class TestAvailableResources(object): - @classmethod - def setup_class(cls): - if not ray.worker.global_worker.connected: - ray.init(num_cpus=1) - - # Finish initializing Ray. Otherwise available_resources() does not - # reflect resource use of submitted tasks - ray.get(cpu_task.remote(0)) - - @classmethod - def teardown_class(cls): - ray.shutdown() + timeout = 10 def test_no_tasks(self): cluster_resources = ray.global_state.cluster_resources() available_resources = ray.global_state.cluster_resources() assert cluster_resources == available_resources - @pytest.mark.timeout(10) def test_replenish_resources(self): cluster_resources = ray.global_state.cluster_resources() @@ -54,20 +36,19 @@ def test_replenish_resources(self): start = time.time() resources_reset = False - while not resources_reset: + while not resources_reset and time.time() - start < self.timeout: resources_reset = ( cluster_resources == ray.global_state.available_resources()) assert resources_reset - @pytest.mark.timeout(10) def test_uses_resources(self): cluster_resources = ray.global_state.cluster_resources() task_id = cpu_task.remote(1) start = time.time() resource_used = False - while not resource_used: + while not resource_used and time.time() - start < self.timeout: available_resources = ray.global_state.available_resources() resource_used = available_resources[ "CPU"] == cluster_resources["CPU"] - 1 @@ -75,43 +56,3 @@ def test_uses_resources(self): assert resource_used ray.get(task_id) # clean up to reset resources - - -class TestMultiNodeState(object): - @classmethod - def setup_class(cls): - subprocess.check_call("ray start --head --redis-port " - "{port} --num-cpus 1 --use-raylet".format( - port=REDIS_PORT).split(" ")) - ray.init(redis_address="localhost:{}".format(REDIS_PORT)) - - @classmethod - def teardown_class(cls): - subprocess.check_call("ray stop".split(" ")) - ray.shutdown() - - @pytest.mark.timeout(20) - def test_add_remove_client(self): - """Tests client table is correct after node removal.""" - clients = ray.global_state.client_table() - assert len(clients) == 1 - head_raylet_pid = _get_raylet_pid(clients[0]["RayletSocketName"]) - - subprocess.check_call( - "ray start --redis-address localhost:{port} " - "--num-cpus 1 --use-raylet".format(port=REDIS_PORT).split(" ")) - - clients = ray.global_state.client_table() - assert len(clients) == 2 - assert sum(cl["Resources"].get("CPU") for cl in clients) == 2 - - worker_raylet_pid = _get_raylet_pid(clients[1]["RayletSocketName"]) - assert head_raylet_pid != worker_raylet_pid - - subprocess.check_output(["kill", str(worker_raylet_pid)]) - - # wait for heartbeat - while all(cl_entries["IsInsertion"] for cl_entries in clients): - clients = ray.global_state.client_table() - time.sleep(1) - assert sum(cl["Resources"].get("CPU", 0) for cl in clients) == 1