diff --git a/python/ray/experimental/state.py b/python/ray/experimental/state.py index 6a0770e9981b..d91165637b60 100644 --- a/python/ray/experimental/state.py +++ b/python/ray/experimental/state.py @@ -508,10 +508,12 @@ def client_table(self): if message is None: return [] - node_info = [] + node_info = {} gcs_entry = ray.gcs_utils.GcsTableEntry.GetRootAsGcsTableEntry( message, 0) + # Since GCS entries are append-only, we override so that + # only the latest entries are kept. for i in range(gcs_entry.EntriesLength()): client = ( ray.gcs_utils.ClientTableData.GetRootAsClientTableData( @@ -522,8 +524,18 @@ def client_table(self): client.ResourcesTotalCapacity(i) for i in range(client.ResourcesTotalLabelLength()) } - node_info.append({ - "ClientID": ray.utils.binary_to_hex(client.ClientId()), + client_id = ray.utils.binary_to_hex(client.ClientId()) + + # If this client is being removed, then it must + # have previously been inserted, and + # it cannot have previously been removed. + if not client.IsInsertion(): + assert client_id in node_info, "Client removed not found!" + assert node_info[client_id]["IsInsertion"], ( + "Unexpected duplicate removal of client.") + + node_info[client_id] = { + "ClientID": client_id, "IsInsertion": client.IsInsertion(), "NodeManagerAddress": decode(client.NodeManagerAddress()), "NodeManagerPort": client.NodeManagerPort(), @@ -532,8 +544,8 @@ def client_table(self): client.ObjectStoreSocketName()), "RayletSocketName": decode(client.RayletSocketName()), "Resources": resources - }) - return node_info + } + return list(node_info.values()) def log_files(self): """Fetch and return a dictionary of log file names to outputs.