Skip to content

Commit

Permalink
Update task_table and object_table API. (#3161)
Browse files Browse the repository at this point in the history
* Update task_table and object_table API.

* Fix
  • Loading branch information
robertnishihara authored and pcmoritz committed Oct 31, 2018
1 parent 9df2e6e commit 1f29a96
Show file tree
Hide file tree
Showing 3 changed files with 56 additions and 134 deletions.
96 changes: 49 additions & 47 deletions python/ray/experimental/state.py
Original file line number Diff line number Diff line change
Expand Up @@ -162,22 +162,26 @@ def _object_table(self, object_id):
message = self._execute_command(object_id, "RAY.TABLE_LOOKUP",
ray.gcs_utils.TablePrefix.OBJECT, "",
object_id.id())
result = []
gcs_entry = ray.gcs_utils.GcsTableEntry.GetRootAsGcsTableEntry(
message, 0)

for i in range(gcs_entry.EntriesLength()):
assert gcs_entry.EntriesLength() > 0

entry = ray.gcs_utils.ObjectTableData.GetRootAsObjectTableData(
gcs_entry.Entries(0), 0)

object_info = {
"DataSize": entry.ObjectSize(),
"Manager": entry.Manager(),
"IsEviction": [entry.IsEviction()],
}

for i in range(1, gcs_entry.EntriesLength()):
entry = ray.gcs_utils.ObjectTableData.GetRootAsObjectTableData(
gcs_entry.Entries(i), 0)
object_info = {
"DataSize": entry.ObjectSize(),
"Manager": entry.Manager(),
"IsEviction": entry.IsEviction(),
"NumEvictions": entry.NumEvictions()
}
result.append(object_info)
object_info["IsEviction"].append(entry.IsEviction())

return result
return object_info

def object_table(self, object_id=None):
"""Fetch and parse the object table info for one or more object IDs.
Expand Down Expand Up @@ -224,44 +228,42 @@ def _task_table(self, task_id):
gcs_entries = ray.gcs_utils.GcsTableEntry.GetRootAsGcsTableEntry(
message, 0)

info = []
for i in range(gcs_entries.EntriesLength()):
task_table_message = ray.gcs_utils.Task.GetRootAsTask(
gcs_entries.Entries(i), 0)

execution_spec = task_table_message.TaskExecutionSpec()
task_spec = task_table_message.TaskSpecification()
task_spec = ray.raylet.task_from_string(task_spec)
task_spec_info = {
"DriverID": binary_to_hex(task_spec.driver_id().id()),
"TaskID": binary_to_hex(task_spec.task_id().id()),
"ParentTaskID": binary_to_hex(task_spec.parent_task_id().id()),
"ParentCounter": task_spec.parent_counter(),
"ActorID": binary_to_hex(task_spec.actor_id().id()),
"ActorCreationID": binary_to_hex(
task_spec.actor_creation_id().id()),
"ActorCreationDummyObjectID": binary_to_hex(
task_spec.actor_creation_dummy_object_id().id()),
"ActorCounter": task_spec.actor_counter(),
"FunctionID": binary_to_hex(task_spec.function_id().id()),
"Args": task_spec.arguments(),
"ReturnObjectIDs": task_spec.returns(),
"RequiredResources": task_spec.required_resources()
}

info.append({
"ExecutionSpec": {
"Dependencies": [
execution_spec.Dependencies(i)
for i in range(execution_spec.DependenciesLength())
],
"LastTimestamp": execution_spec.LastTimestamp(),
"NumForwards": execution_spec.NumForwards()
},
"TaskSpec": task_spec_info
})
assert gcs_entries.EntriesLength() == 1

task_table_message = ray.gcs_utils.Task.GetRootAsTask(
gcs_entries.Entries(0), 0)

execution_spec = task_table_message.TaskExecutionSpec()
task_spec = task_table_message.TaskSpecification()
task_spec = ray.raylet.task_from_string(task_spec)
task_spec_info = {
"DriverID": binary_to_hex(task_spec.driver_id().id()),
"TaskID": binary_to_hex(task_spec.task_id().id()),
"ParentTaskID": binary_to_hex(task_spec.parent_task_id().id()),
"ParentCounter": task_spec.parent_counter(),
"ActorID": binary_to_hex(task_spec.actor_id().id()),
"ActorCreationID": binary_to_hex(
task_spec.actor_creation_id().id()),
"ActorCreationDummyObjectID": binary_to_hex(
task_spec.actor_creation_dummy_object_id().id()),
"ActorCounter": task_spec.actor_counter(),
"FunctionID": binary_to_hex(task_spec.function_id().id()),
"Args": task_spec.arguments(),
"ReturnObjectIDs": task_spec.returns(),
"RequiredResources": task_spec.required_resources()
}

return info
return {
"ExecutionSpec": {
"Dependencies": [
execution_spec.Dependencies(i)
for i in range(execution_spec.DependenciesLength())
],
"LastTimestamp": execution_spec.LastTimestamp(),
"NumForwards": execution_spec.NumForwards()
},
"TaskSpec": task_spec_info
}

def task_table(self, task_id=None):
"""Fetch and parse the task table information for one or more task IDs.
Expand Down
9 changes: 3 additions & 6 deletions python/ray/monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -152,10 +152,8 @@ def _xray_clean_up_entries_for_driver(self, driver_id):
task_table_objects = self.state.task_table()
driver_id_hex = binary_to_hex(driver_id)
driver_task_id_bins = set()
for task_id_hex in task_table_objects:
if len(task_table_objects[task_id_hex]) == 0:
continue
task_table_object = task_table_objects[task_id_hex][0]["TaskSpec"]
for task_id_hex, task_info in task_table_objects.items():
task_table_object = task_info["TaskSpec"]
task_driver_id_hex = task_table_object["DriverID"]
if driver_id_hex != task_driver_id_hex:
# Ignore tasks that aren't from this driver.
Expand All @@ -165,8 +163,7 @@ def _xray_clean_up_entries_for_driver(self, driver_id):
# Get objects associated with the driver.
object_table_objects = self.state.object_table()
driver_object_id_bins = set()
for object_id, object_table_object in object_table_objects.items():
assert len(object_table_object) > 0
for object_id, _ in object_table_objects.items():
task_id_bin = ray.raylet.compute_task_id(object_id).id()
if task_id_bin in driver_task_id_bins:
driver_object_id_bins.add(object_id.id())
Expand Down
85 changes: 4 additions & 81 deletions test/runtest.py
Original file line number Diff line number Diff line change
Expand Up @@ -2115,8 +2115,7 @@ def test_global_state_api(shutdown_only):
task_table = ray.global_state.task_table()
assert len(task_table) == 1
assert driver_task_id == list(task_table.keys())[0]
assert len(task_table[driver_task_id]) == 1
task_spec = task_table[driver_task_id][0]["TaskSpec"]
task_spec = task_table[driver_task_id]["TaskSpec"]

assert task_spec["TaskID"] == driver_task_id
assert task_spec["ActorID"] == ray_constants.ID_SIZE * "ff"
Expand Down Expand Up @@ -2147,7 +2146,7 @@ def f(*xs):
task_id = list(task_id_set)[0]

function_table = ray.global_state.function_table()
task_spec = task_table[task_id][0]["TaskSpec"]
task_spec = task_table[task_id]["TaskSpec"]
assert task_spec["ActorID"] == ray_constants.ID_SIZE * "ff"
assert task_spec["Args"] == [1, "hi", x_id]
assert task_spec["DriverID"] == driver_id
Expand Down Expand Up @@ -2178,13 +2177,9 @@ def wait_for_object_table():
object_table = ray.global_state.object_table()
assert len(object_table) == 2

assert len(object_table[x_id]) == 1
assert object_table[x_id][0]["IsEviction"] is False
assert object_table[x_id][0]["NumEvictions"] == 0
assert object_table[x_id]["IsEviction"][0] is False

assert len(object_table[result_id]) == 1
assert object_table[result_id][0]["IsEviction"] is False
assert object_table[result_id][0]["NumEvictions"] == 0
assert object_table[result_id]["IsEviction"][0] is False

assert object_table[x_id] == ray.global_state.object_table(x_id)
object_table_entry = ray.global_state.object_table(result_id)
Expand Down Expand Up @@ -2251,78 +2246,6 @@ def f():
assert "stdout_file" in info


@pytest.mark.skip("This test does not work yet.")
@pytest.mark.skipif(
os.environ.get("RAY_USE_NEW_GCS") == "on",
reason="New GCS API doesn't have a Python API yet.")
def test_flush_api(shutdown_only):
ray.init(num_cpus=1)

@ray.remote
def f():
return 1

[ray.put(1) for _ in range(10)]
ray.get([f.remote() for _ in range(10)])

# Wait until all of the task and object information has been stored in
# Redis. Note that since a given key may be updated multiple times
# (e.g., multiple calls to TaskTableUpdate), this is an attempt to wait
# until all updates have happened. Note that in a real application we
# could encounter this kind of issue as well.
while True:
object_table = ray.global_state.object_table()
task_table = ray.global_state.task_table()

tables_ready = True

if len(object_table) != 20:
tables_ready = False

for object_info in object_table.values():
if len(object_info) != 5:
tables_ready = False
if (object_info["ManagerIDs"] is None
or object_info["DataSize"] == -1
or object_info["Hash"] == ""):
tables_ready = False

if len(task_table) != 10 + 1:
tables_ready = False

driver_task_id = ray.utils.binary_to_hex(
ray.worker.global_worker.current_task_id.id())

for info in task_table.values():
if info["State"] != ray.experimental.state.TASK_STATUS_DONE:
if info["TaskSpec"]["TaskID"] != driver_task_id:
tables_ready = False

if tables_ready:
break
# this test case is blocked sometimes, add this may fix the problem
time.sleep(0.1)

# Flush the tables.
ray.experimental.flush_redis_unsafe()
ray.experimental.flush_task_and_object_metadata_unsafe()

# Make sure the tables are empty.
assert len(ray.global_state.object_table()) == 0
assert len(ray.global_state.task_table()) == 0

# Run some more tasks.
ray.get([f.remote() for _ in range(10)])

while len(ray.global_state.task_table()) != 0:
time.sleep(0.1)
ray.experimental.flush_finished_tasks_unsafe()

# Make sure that we can call this method (but it won't do anything in
# this test case).
ray.experimental.flush_evicted_objects_unsafe()


@pytest.fixture
def shutdown_only_with_initialization_check():
yield None
Expand Down

0 comments on commit 1f29a96

Please sign in to comment.